You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/03/15 21:23:08 UTC

git commit: SQOOP-903: Sqoop2: Add schema support to Generic JDBC Connector

Updated Branches:
  refs/heads/sqoop2 daccebac5 -> 57d4d3aa0


SQOOP-903: Sqoop2: Add schema support to Generic JDBC Connector

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/57d4d3aa
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/57d4d3aa
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/57d4d3aa

Branch: refs/heads/sqoop2
Commit: 57d4d3aa0290fa7cfcc82ce2814bdec908cc7014
Parents: dacceba
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Mar 15 13:22:07 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Mar 15 13:22:07 2013 -0700

----------------------------------------------------------------------
 .../jdbc/GenericJdbcExportInitializer.java         |   10 +-
 .../jdbc/GenericJdbcImportInitializer.java         |   15 +-
 .../sqoop/connector/jdbc/GenericJdbcValidator.java |    3 +
 .../jdbc/configuration/ExportTableForm.java        |    1 +
 .../jdbc/configuration/ImportTableForm.java        |    1 +
 .../generic-jdbc-connector-resources.properties    |    4 +
 .../connector/jdbc/TestExportInitializer.java      |  110 ++++++++--
 .../connector/jdbc/TestImportInitializer.java      |  173 ++++++++++++++-
 8 files changed, 285 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 520b0bb..40a7774 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -67,6 +67,7 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
   private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
     String dataSql;
 
+    String schemaName = jobConfig.table.schemaName;
     String tableName = jobConfig.table.tableName;
     String tableSql = jobConfig.table.sql;
     String tableColumns = jobConfig.table.columns;
@@ -79,12 +80,15 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
     } else if (tableName != null) {
       // when table name is specified:
 
+      // For databases that support schemas (IE: postgresql).
+      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
       if (tableColumns == null) {
         String[] columns = executor.getQueryColumns("SELECT * FROM "
-            + executor.delimitIdentifier(tableName) + " WHERE 1 = 0");
+            + fullTableName + " WHERE 1 = 0");
         StringBuilder builder = new StringBuilder();
         builder.append("INSERT INTO ");
-        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(fullTableName);
         builder.append(" VALUES (?");
         for (int i = 1; i < columns.length; i++) {
           builder.append(",?");
@@ -96,7 +100,7 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
         String[] columns = StringUtils.split(tableColumns, ',');
         StringBuilder builder = new StringBuilder();
         builder.append("INSERT INTO ");
-        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(fullTableName);
         builder.append(" (");
         builder.append(tableColumns);
         builder.append(") VALUES (?");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 46c7ee7..3e9789c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -104,6 +104,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
     if (minMaxQuery == null) {
       StringBuilder builder = new StringBuilder();
 
+      String schemaName = jobConfig.table.schemaName;
       String tableName = jobConfig.table.tableName;
       String tableSql = jobConfig.table.sql;
 
@@ -114,13 +115,17 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
 
       } else if (tableName != null) {
         // when table name is specified:
+
+        // For databases that support schemas (IE: postgresql).
+        String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
         String column = partitionColumnName;
         builder.append("SELECT MIN(");
         builder.append(column);
         builder.append("), MAX(");
         builder.append(column);
         builder.append(") FROM ");
-        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(fullTableName);
 
       } else if (tableSql != null) {
         String column = executor.qualify(
@@ -177,6 +182,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
     String dataSql;
     String fieldNames;
 
+    String schemaName = jobConfig.table.schemaName;
     String tableName = jobConfig.table.tableName;
     String tableSql = jobConfig.table.sql;
     String tableColumns = jobConfig.table.columns;
@@ -189,10 +195,13 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
     } else if (tableName != null) {
       // when table name is specified:
 
+      // For databases that support schemas (IE: postgresql).
+      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
       if (tableColumns == null) {
         StringBuilder builder = new StringBuilder();
         builder.append("SELECT * FROM ");
-        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(fullTableName);
         builder.append(" WHERE ");
         builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
         dataSql = builder.toString();
@@ -206,7 +215,7 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
         builder.append("SELECT ");
         builder.append(tableColumns);
         builder.append(" FROM ");
-        builder.append(executor.delimitIdentifier(tableName));
+        builder.append(fullTableName);
         builder.append(" WHERE ");
         builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
         dataSql = builder.toString();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index e098fbc..4e24517 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -102,6 +102,9 @@ public class GenericJdbcValidator extends Validator {
     if(configuration.table.tableName != null && configuration.table.sql != null) {
       validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified");
     }
+    if(configuration.table.schemaName != null && configuration.table.sql != null) {
+      validation.addMessage(Status.UNACCEPTABLE, "table", "Both schema name and SQL cannot be specified");
+    }
 
     if(configuration.table.sql != null && !configuration.table.sql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN)) {
       validation.addMessage(Status.UNACCEPTABLE, "table", "sql", "SQL statement must contain placeholder for auto generated "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
index 718d1fb..ee4bb6e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.model.Input;
  */
 @FormClass
 public class ExportTableForm {
+  @Input(size = 50) public String schemaName;
   @Input(size = 50) public String tableName;
   @Input(size = 50) public String sql;
   @Input(size = 50) public String columns;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
index d150779..3422a8f 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.model.Input;
  */
 @FormClass
 public class ImportTableForm {
+  @Input(size = 50) public String schemaName;
   @Input(size = 50) public String tableName;
   @Input(size = 50) public String sql;
   @Input(size = 50) public String columns;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
index 6ab4296..44fc984 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties
@@ -53,6 +53,10 @@ table.label = Database configuration
 table.help = You must supply the information requested in order to create \
                   a job object.
 
+# Schema name
+table.schemaName.label = Schema name
+table.schemaName.help = Schema name to process data in the remote database
+
 # Table name
 table.tableName.label = Table name
 table.tableName.help = Table name to process data in the remote database

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index bb0c23b..f83aaa2 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -28,15 +28,21 @@ import org.apache.sqoop.job.etl.InitializerContext;
 
 public class TestExportInitializer extends TestCase {
 
+  private final String schemaName;
   private final String tableName;
+  private final String schemalessTableName;
   private final String tableSql;
+  private final String schemalessTableSql;
   private final String tableColumns;
 
   private GenericJdbcExecutor executor;
 
   public TestExportInitializer() {
-    tableName = getClass().getSimpleName().toUpperCase();
+    schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
+    tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
+    schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
     tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)";
+    schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)";
     tableColumns = "ICOL,VCOL";
   }
 
@@ -45,10 +51,15 @@ public class TestExportInitializer extends TestCase {
     executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
         GenericJdbcTestConstants.URL, null, null);
 
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
     if (!executor.existTable(tableName)) {
-      executor.executeUpdate("CREATE TABLE "
-          + executor.delimitIdentifier(tableName)
-          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+      executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
+      executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+    }
+
+    fullTableName = executor.delimitIdentifier(schemalessTableName);
+    if (!executor.existTable(schemalessTableName)) {
+      executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
     }
   }
 
@@ -57,62 +68,131 @@ public class TestExportInitializer extends TestCase {
     executor.close();
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableName() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ExportJobConfiguration jobConf = new ExportJobConfiguration();
 
+    String fullTableName = executor.delimitIdentifier(schemalessTableName);
+
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.table.tableName = tableName;
+    jobConf.table.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcExportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
-    verifyResult(context,
-        "INSERT INTO " + executor.delimitIdentifier(tableName)
-            + " VALUES (?,?,?)");
+    verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumns() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ExportJobConfiguration jobConf = new ExportJobConfiguration();
 
+    String fullTableName = executor.delimitIdentifier(schemalessTableName);
+
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.table.tableName = tableName;
+    jobConf.table.tableName = schemalessTableName;
     jobConf.table.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcExportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
-    verifyResult(context,
-        "INSERT INTO " + executor.delimitIdentifier(tableName)
-            + " (" + tableColumns + ") VALUES (?,?)");
+    verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableSql() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ExportJobConfiguration jobConf = new ExportJobConfiguration();
 
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.sql = schemalessTableSql;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableNameWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.tableName = tableName;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableNameWithTableColumnsWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.tableName = tableName;
+    jobConf.table.columns = tableColumns;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcExportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableSqlWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ExportJobConfiguration jobConf = new ExportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
     jobConf.table.sql = tableSql;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcExportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
-    verifyResult(context,
-        "INSERT INTO " + executor.delimitIdentifier(tableName)
-            + " VALUES (?,?,?)");
+    verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
   }
 
   private void verifyResult(MutableContext context, String dataSql) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/57d4d3aa/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index 45835bd..9f4269a 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -31,8 +31,11 @@ import org.apache.sqoop.job.etl.InitializerContext;
 
 public class TestImportInitializer extends TestCase {
 
+  private final String schemaName;
   private final String tableName;
+  private final String schemalessTableName;
   private final String tableSql;
+  private final String schemalessTableSql;
   private final String tableColumns;
 
   private GenericJdbcExecutor executor;
@@ -41,8 +44,11 @@ public class TestImportInitializer extends TestCase {
   private static final int NUMBER_OF_ROWS = 101;
 
   public TestImportInitializer() {
-    tableName = getClass().getSimpleName().toUpperCase();
-    tableSql = "SELECT * FROM " + tableName + " WHERE ${CONDITIONS}";
+    schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA";
+    tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA";
+    schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
+    tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
+    schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
     tableColumns = "ICOL,VCOL";
   }
 
@@ -51,14 +57,30 @@ public class TestImportInitializer extends TestCase {
     executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
         GenericJdbcTestConstants.URL, null, null);
 
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
     if (!executor.existTable(tableName)) {
+      executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
       executor.executeUpdate("CREATE TABLE "
-          + executor.delimitIdentifier(tableName)
+          + fullTableName
           + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = START + i;
-        String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+        String sql = "INSERT INTO " + fullTableName
+            + " VALUES(" + value + ", " + value + ", '" + value + "')";
+        executor.executeUpdate(sql);
+      }
+    }
+
+    fullTableName = executor.delimitIdentifier(schemalessTableName);
+    if (!executor.existTable(schemalessTableName)) {
+      executor.executeUpdate("CREATE TABLE "
+          + fullTableName
+          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+
+      for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+        int value = START + i;
+        String sql = "INSERT INTO " + fullTableName
             + " VALUES(" + value + ", " + value + ", '" + value + "')";
         executor.executeUpdate(sql);
       }
@@ -70,22 +92,24 @@ public class TestImportInitializer extends TestCase {
     executor.close();
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableName() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.table.tableName = tableName;
+    jobConf.table.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcImportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(tableName)
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
             + " WHERE ${CONDITIONS}",
         "ICOL,DCOL,VCOL",
         "ICOL",
@@ -94,23 +118,25 @@ public class TestImportInitializer extends TestCase {
         String.valueOf(START+NUMBER_OF_ROWS-1));
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumns() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.table.tableName = tableName;
+    jobConf.table.tableName = schemalessTableName;
     jobConf.table.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcImportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
-        "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
+        "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
             + " WHERE ${CONDITIONS}",
         tableColumns,
         "ICOL",
@@ -119,23 +145,25 @@ public class TestImportInitializer extends TestCase {
         String.valueOf(START+NUMBER_OF_ROWS-1));
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableSql() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.table.sql = tableSql;
+    jobConf.table.sql = schemalessTableSql;
     jobConf.table.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcImportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(tableName)
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
             + " WHERE ${CONDITIONS}",
         "ICOL,DCOL,VCOL",
         "DCOL",
@@ -144,12 +172,134 @@ public class TestImportInitializer extends TestCase {
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
   }
 
+  @SuppressWarnings("unchecked")
   public void testTableSqlWithTableColumns() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.sql = schemalessTableSql;
+    jobConf.table.columns = tableColumns;
+    jobConf.table.partitionColumn = "DCOL";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context,
+        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
+            + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
+            + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
+        tableColumns,
+        "DCOL",
+        String.valueOf(Types.DOUBLE),
+        String.valueOf((double)START),
+        String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableNameWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.tableName = tableName;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context,
+        "SELECT * FROM " + fullTableName
+            + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(START),
+        String.valueOf(START+NUMBER_OF_ROWS-1));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableNameWithTableColumnsWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.tableName = tableName;
+    jobConf.table.columns = tableColumns;
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context,
+        "SELECT ICOL,VCOL FROM " + fullTableName
+            + " WHERE ${CONDITIONS}",
+        tableColumns,
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(START),
+        String.valueOf(START+NUMBER_OF_ROWS-1));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableSqlWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.sql = tableSql;
+    jobConf.table.partitionColumn = "DCOL";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+
+    verifyResult(context,
+        "SELECT * FROM " + fullTableName
+            + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "DCOL",
+        String.valueOf(Types.DOUBLE),
+        String.valueOf((double)START),
+        String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testTableSqlWithTableColumnsWithSchema() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
     jobConf.table.sql = tableSql;
     jobConf.table.columns = tableColumns;
     jobConf.table.partitionColumn = "DCOL";
@@ -157,12 +307,13 @@ public class TestImportInitializer extends TestCase {
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
+    @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcImportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
-            + "(SELECT * FROM " + executor.delimitIdentifier(tableName)
+            + "(SELECT * FROM " + fullTableName
             + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
         tableColumns,
         "DCOL",