You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/03/15 21:23:08 UTC
git commit: SQOOP-903: Sqoop2: Add schema support to Generic JDBC
Connector
Updated Branches:
refs/heads/sqoop2 daccebac5 -> 57d4d3aa0
SQOOP-903: Sqoop2: Add schema support to Generic JDBC Connector
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/57d4d3aa
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/57d4d3aa
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/57d4d3aa
Branch: refs/heads/sqoop2
Commit: 57d4d3aa0290fa7cfcc82ce2814bdec908cc7014
Parents: dacceba
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Mar 15 13:22:07 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Mar 15 13:22:07 2013 -0700
----------------------------------------------------------------------
.../jdbc/GenericJdbcExportInitializer.java | 10 +-
.../jdbc/GenericJdbcImportInitializer.java | 15 +-
.../sqoop/connector/jdbc/GenericJdbcValidator.java | 3 +
.../jdbc/configuration/ExportTableForm.java | 1 +
.../jdbc/configuration/ImportTableForm.java | 1 +
.../generic-jdbc-connector-resources.properties | 4 +
.../connector/jdbc/TestExportInitializer.java | 110 ++++++++--
.../connector/jdbc/TestImportInitializer.java | 173 ++++++++++++++-
8 files changed, 285 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 520b0bb..40a7774 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -67,6 +67,7 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
String dataSql;
+ String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
@@ -79,12 +80,15 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
} else if (tableName != null) {
// when table name is specified:
+ // For databases that support schemas (IE: postgresql).
+ String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
if (tableColumns == null) {
String[] columns = executor.getQueryColumns("SELECT * FROM "
- + executor.delimitIdentifier(tableName) + " WHERE 1 = 0");
+ + fullTableName + " WHERE 1 = 0");
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
- builder.append(executor.delimitIdentifier(tableName));
+ builder.append(fullTableName);
builder.append(" VALUES (?");
for (int i = 1; i < columns.length; i++) {
builder.append(",?");
@@ -96,7 +100,7 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
String[] columns = StringUtils.split(tableColumns, ',');
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
- builder.append(executor.delimitIdentifier(tableName));
+ builder.append(fullTableName);
builder.append(" (");
builder.append(tableColumns);
builder.append(") VALUES (?");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 46c7ee7..3e9789c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -104,6 +104,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
if (minMaxQuery == null) {
StringBuilder builder = new StringBuilder();
+ String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
@@ -114,13 +115,17 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
} else if (tableName != null) {
// when table name is specified:
+
+ // For databases that support schemas (IE: postgresql).
+ String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
String column = partitionColumnName;
builder.append("SELECT MIN(");
builder.append(column);
builder.append("), MAX(");
builder.append(column);
builder.append(") FROM ");
- builder.append(executor.delimitIdentifier(tableName));
+ builder.append(fullTableName);
} else if (tableSql != null) {
String column = executor.qualify(
@@ -177,6 +182,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
String dataSql;
String fieldNames;
+ String schemaName = jobConfig.table.schemaName;
String tableName = jobConfig.table.tableName;
String tableSql = jobConfig.table.sql;
String tableColumns = jobConfig.table.columns;
@@ -189,10 +195,13 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
} else if (tableName != null) {
// when table name is specified:
+ // For databases that support schemas (IE: postgresql).
+ String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
if (tableColumns == null) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT * FROM ");
- builder.append(executor.delimitIdentifier(tableName));
+ builder.append(fullTableName);
builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString();
@@ -206,7 +215,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
builder.append("SELECT ");
builder.append(tableColumns);
builder.append(" FROM ");
- builder.append(executor.delimitIdentifier(tableName));
+ builder.append(fullTableName);
builder.append(" WHERE ");
builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
dataSql = builder.toString();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index e098fbc..4e24517 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -102,6 +102,9 @@ public class GenericJdbcValidator extends Validator {
if(configuration.table.tableName != null && configuration.table.sql != null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
}
+ if(configuration.table.schemaName != null && configuration.table.sql != null) {
+ validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified");
+ }
if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated "
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
index 718d1fb..ee4bb6e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.model.Input;
*/
@FormClass
public class ExportTableForm {
+ @Input(size = 50) public String schemaName;
@Input(size = 50) public String tableName;
@Input(size = 50) public String sql;
@Input(size = 50) public String columns;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
index d150779..3422a8f 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.model.Input;
*/
@FormClass
public class ImportTableForm {
+ @Input(size = 50) public String schemaName;
@Input(size = 50) public String tableName;
@Input(size = 50) public String sql;
@Input(size = 50) public String columns;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
index 6ab4296..44fc984 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
@@ -53,6 +53,10 @@ table.label = Database configuration
table.help = You must supply the information requested in order to create \
a job object.
+# Schema name
+table.schemaName.label = Schema name
+table.schemaName.help = Schema name to process data in the remote database
+
# Table name
table.tableName.label = Table name
table.tableName.help = Table name to process data in the remote database
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index bb0c23b..f83aaa2 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -28,15 +28,21 @@ import org.apache.sqoop.job.etl.InitializerContext;
public class TestExportInitializer extends TestCase {
+ private final String schemaName;
private final String tableName;
+ private final String schemalessTableName;
private final String tableSql;
+ private final String schemalessTableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
public TestExportInitializer() {
- tableName = getClass().getSimpleName().toUpperCase();
+ schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
+ tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
+ schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
+ schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
tableColumns = "ICOL,VCOL";
}
@@ -45,10 +51,15 @@ public class TestExportInitializer extends TestCase {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) {
- executor.executeUpdate("CREATE TABLE "
- + executor.delimitIdentifier(tableName)
- + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+ executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
+ executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+ }
+
+ fullTableName = executor.delimitIdentifier(schemalessTableName);
+ if (!executor.existTable(schemalessTableName)) {
+ executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
}
}
@@ -57,62 +68,131 @@ public class TestExportInitializer extends TestCase {
executor.close();
}
+ @SuppressWarnings("unchecked")
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
+ String fullTableName = executor.delimitIdentifier(schemalessTableName);
+
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
- jobConf.table.tableName = tableName;
+ jobConf.table.tableName = schemalessTableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
- verifyResult(context,
- "INSERT INTO " + executor.delimitIdentifier(tableName)
- + " VALUES (?,?,?)");
+ verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
}
+ @SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
+ String fullTableName = executor.delimitIdentifier(schemalessTableName);
+
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
- jobConf.table.tableName = tableName;
+ jobConf.table.tableName = schemalessTableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
- verifyResult(context,
- "INSERT INTO " + executor.delimitIdentifier(tableName)
- + " (" + tableColumns + ") VALUES (?,?)");
+ verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
}
+ @SuppressWarnings("unchecked")
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ExportJobConfiguration jobConf = new ExportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.sql = schemalessTableSql;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableNameWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
+ jobConf.table.tableName = tableName;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableNameWithTableColumnsWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
+ jobConf.table.tableName = tableName;
+ jobConf.table.columns = tableColumns;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcExportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableSqlWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcExportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
- verifyResult(context,
- "INSERT INTO " + executor.delimitIdentifier(tableName)
- + " VALUES (?,?,?)");
+ verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
}
private void verifyResult(MutableContext context, String dataSql) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index 45835bd..9f4269a 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -31,8 +31,11 @@ import org.apache.sqoop.job.etl.InitializerContext;
public class TestImportInitializer extends TestCase {
+ private final String schemaName;
private final String tableName;
+ private final String schemalessTableName;
private final String tableSql;
+ private final String schemalessTableSql;
private final String tableColumns;
private GenericJdbcExecutor executor;
@@ -41,8 +44,11 @@ public class TestImportInitializer extends TestCase {
private static final int NUMBER_OF_ROWS = 101;
public TestImportInitializer() {
- tableName = getClass().getSimpleName().toUpperCase();
- tableSql = "SELECT * FROM " + tableName + " WHERE ${CONDITIONS}";
+ schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
+ tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
+ schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
+ tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
+ schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
tableColumns = "ICOL,VCOL";
}
@@ -51,14 +57,30 @@ public class TestImportInitializer extends TestCase {
executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
GenericJdbcTestConstants.URL, null, null);
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
if (!executor.existTable(tableName)) {
+ executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
executor.executeUpdate("CREATE TABLE "
- + executor.delimitIdentifier(tableName)
+ + fullTableName
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
- String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+ String sql = "INSERT INTO " + fullTableName
+ + " VALUES(" + value + ", " + value + ", '" + value + "')";
+ executor.executeUpdate(sql);
+ }
+ }
+
+ fullTableName = executor.delimitIdentifier(schemalessTableName);
+ if (!executor.existTable(schemalessTableName)) {
+ executor.executeUpdate("CREATE TABLE "
+ + fullTableName
+ + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+
+ for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+ int value = START + i;
+ String sql = "INSERT INTO " + fullTableName
+ " VALUES(" + value + ", " + value + ", '" + value + "')";
executor.executeUpdate(sql);
}
@@ -70,22 +92,24 @@ public class TestImportInitializer extends TestCase {
executor.close();
}
+ @SuppressWarnings("unchecked")
public void testTableName() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
- jobConf.table.tableName = tableName;
+ jobConf.table.tableName = schemalessTableName;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
- "SELECT * FROM " + executor.delimitIdentifier(tableName)
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"ICOL",
@@ -94,23 +118,25 @@ public class TestImportInitializer extends TestCase {
String.valueOf(START+NUMBER_OF_ROWS-1));
}
+ @SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
- jobConf.table.tableName = tableName;
+ jobConf.table.tableName = schemalessTableName;
jobConf.table.columns = tableColumns;
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
- "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
+ "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
tableColumns,
"ICOL",
@@ -119,23 +145,25 @@ public class TestImportInitializer extends TestCase {
String.valueOf(START+NUMBER_OF_ROWS-1));
}
+ @SuppressWarnings("unchecked")
public void testTableSql() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
- jobConf.table.sql = tableSql;
+ jobConf.table.sql = schemalessTableSql;
jobConf.table.partitionColumn = "DCOL";
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
- "SELECT * FROM " + executor.delimitIdentifier(tableName)
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ " WHERE ${CONDITIONS}",
"ICOL,DCOL,VCOL",
"DCOL",
@@ -144,12 +172,134 @@ public class TestImportInitializer extends TestCase {
String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
}
+ @SuppressWarnings("unchecked")
public void testTableSqlWithTableColumns() throws Exception {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.sql = schemalessTableSql;
+ jobConf.table.columns = tableColumns;
+ jobConf.table.partitionColumn = "DCOL";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcImportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context,
+ "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+ + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+ + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
+ tableColumns,
+ "DCOL",
+ String.valueOf(Types.DOUBLE),
+ String.valueOf((double)START),
+ String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableNameWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
+ jobConf.table.tableName = tableName;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcImportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context,
+ "SELECT * FROM " + fullTableName
+ + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(START),
+ String.valueOf(START+NUMBER_OF_ROWS-1));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableNameWithTableColumnsWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
+ jobConf.table.tableName = tableName;
+ jobConf.table.columns = tableColumns;
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcImportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context,
+ "SELECT ICOL,VCOL FROM " + fullTableName
+ + " WHERE ${CONDITIONS}",
+ tableColumns,
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(START),
+ String.valueOf(START+NUMBER_OF_ROWS-1));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableSqlWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
+ jobConf.table.sql = tableSql;
+ jobConf.table.partitionColumn = "DCOL";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcImportInitializer();
+ initializer.initialize(initializerContext, connConf, jobConf);
+
+ verifyResult(context,
+ "SELECT * FROM " + fullTableName
+ + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "DCOL",
+ String.valueOf(Types.DOUBLE),
+ String.valueOf((double)START),
+ String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testTableSqlWithTableColumnsWithSchema() throws Exception {
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+ jobConf.table.schemaName = schemaName;
jobConf.table.sql = tableSql;
jobConf.table.columns = tableColumns;
jobConf.table.partitionColumn = "DCOL";
@@ -157,12 +307,13 @@ public class TestImportInitializer extends TestCase {
MutableContext context = new MutableMapContext();
InitializerContext initializerContext = new InitializerContext(context);
+ @SuppressWarnings("rawtypes")
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
verifyResult(context,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
- + "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+ + "(SELECT * FROM " + fullTableName
+ " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
tableColumns,
"DCOL",