You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/08/10 22:45:14 UTC

sqoop git commit: SQOOP-2244: Sqoop2: Generic JDBC: Automatically escape table and column names from configuration objects

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 7202fe3a5 -> 3362fbb1b


SQOOP-2244: Sqoop2: Generic JDBC: Automatically escape table and column names from configuration objects

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3362fbb1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3362fbb1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3362fbb1

Branch: refs/heads/sqoop2
Commit: 3362fbb1b1369f598f9d88860989bf859bd5af0d
Parents: 7202fe3
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Aug 10 13:44:17 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Aug 10 13:44:17 2015 -0700

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcExecutor.java     | 145 +++++++++++++------
 .../jdbc/GenericJdbcFromInitializer.java        |  71 +++------
 .../jdbc/GenericJdbcToInitializer.java          |  12 +-
 .../jdbc/configuration/LinkConfiguration.java   |   3 +
 .../jdbc/configuration/SqlDialect.java          |  33 +++++
 .../generic-jdbc-connector-config.properties    |   7 +
 .../connector/jdbc/GenericJdbcExecutorTest.java |  90 +++++++++---
 .../jdbc/GenericJdbcTestConstants.java          |   5 +
 .../sqoop/connector/jdbc/TestExtractor.java     |  16 +-
 .../connector/jdbc/TestFromInitializer.java     | 128 ++++++++--------
 .../apache/sqoop/connector/jdbc/TestLoader.java |   7 +-
 .../sqoop/connector/jdbc/TestToInitializer.java |  24 +--
 .../sqoop/test/testcases/ConnectorTestCase.java |   6 +-
 .../jdbc/generic/FromRDBMSToHDFSTest.java       |   8 +-
 .../jdbc/generic/IncrementalReadTest.java       |   6 +-
 .../jdbc/generic/TableStagedRDBMSTest.java      |   2 +-
 16 files changed, 337 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index cab0917..1aeca7e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -17,9 +17,9 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
 import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
 import org.apache.sqoop.error.code.GenericJdbcConnectorError;
 import org.apache.sqoop.schema.Schema;
@@ -38,6 +38,8 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -112,6 +114,11 @@ public class GenericJdbcExecutor {
       logSQLException(e);
       throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0001, e);
     }
+
+    // Fill in defaults if they were not pre-entered by user
+    if(link.dialect.identifierEnclose == null) {
+      link.dialect.identifierEnclose = "\"";
+    }
   }
 
   public ResultSet executeQuery(String sql) {
@@ -144,14 +151,69 @@ public class GenericJdbcExecutor {
     }
   }
 
+  /**
+   *  Enclose given identifier based on the configuration from user.
+   *
+   * @param identifier Identifier to enclose
+   * @return Enclosed variant
+   */
+  public String encloseIdentifier(String identifier) {
+    assert identifier != null;
+    return link.dialect.identifierEnclose + identifier + link.dialect.identifierEnclose;
+  }
+
+  /**
+   * Enclose multiple identifiers and join them together to one string.
+   *
+   * Used to convert (schema, table) to string "schema"."table" needed for SQL queries.
+   *
+   * @param identifiers  Identifiers to enclose
+   * @return Enclose identifiers joined with "."
+   */
+  public String encloseIdentifiers(String ...identifiers) {
+    assert identifiers != null;
+
+    List<String> enclosedIdentifiers = new LinkedList<String>();
+    for(String identifier: identifiers) {
+      if(identifier != null) {
+        enclosedIdentifiers.add(encloseIdentifier(identifier));
+      }
+    }
+
+    return StringUtils.join(enclosedIdentifiers, ".");
+  }
+
+  /**
+   * Create column list fragment for SELECT SQL.
+   *
+   * For example for (id, text, date), will automatically escape the column
+   * names and return one string:
+   *   "id", "text", "date"
+   *
+   * This method won't work correctly if the column name contains an expression
+   * or anything else beyond just a column name.
+   *
+   * @param columnNames Column names to escape and join.
+   * @return
+   */
+  public String columnList(String ...columnNames) {
+    assert columnNames != null;
+
+    List<String> escapedColumns = new LinkedList<String>();
+    for(String column : columnNames) {
+      escapedColumns.add(encloseIdentifier(column));
+    }
+
+    return StringUtils.join(escapedColumns, ", ");
+  }
+
   public void deleteTableData(String tableName) {
     LOG.info("Deleting all the rows from: " + tableName);
-    executeUpdate("DELETE FROM " + tableName);
+    executeUpdate("DELETE FROM " + encloseIdentifier(tableName));
   }
 
   public void migrateData(String fromTable, String toTable) {
-    String insertQuery = "INSERT INTO " + toTable +
-      " SELECT * FROM " + fromTable;
+    String insertQuery = "INSERT INTO " + encloseIdentifier(toTable) + " SELECT * FROM " + encloseIdentifier(fromTable);
     Statement stmt = null;
     Boolean oldAutoCommit = null;
     try {
@@ -196,7 +258,7 @@ public class GenericJdbcExecutor {
   }
 
   public long getTableRowCount(String tableName) {
-    ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + tableName);
+    ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + encloseIdentifier(tableName));
     try {
       resultSet.next();
       return resultSet.getLong(1);
@@ -295,11 +357,26 @@ public class GenericJdbcExecutor {
     }
   }
 
-  public String getPrimaryKey(String table) {
+  /**
+   * Return primary key for given table.
+   *
+   * @param identifiers Identifiers that are used to build the table's name. Following
+   *                    variants are accepted:
+   *                    * (catalog, schema, table)
+   *                    * (schema, table)
+   *                    * (table)
+   *                    Return value of any combination is "undefined".
+   * @return Primary key's name
+   */
+  public String getPrimaryKey(String ...identifiers) {
+    int index = 0;
+    String catalog = identifiers.length >= 3 ? identifiers[index++] : null;
+    String schema = identifiers.length >= 2 ? identifiers[index++] : null;
+    String table = identifiers[index];
+
     try {
-      String[] splitNames = dequalify(table);
       DatabaseMetaData dbmd = connection.getMetaData();
-      ResultSet rs = dbmd.getPrimaryKeys(null, splitNames[0], splitNames[1]);
+      ResultSet rs = dbmd.getPrimaryKeys(catalog, schema, table);
 
       if (rs != null && rs.next()) {
         return rs.getString("COLUMN_NAME");
@@ -335,12 +412,26 @@ public class GenericJdbcExecutor {
     }
   }
 
-  public boolean existTable(String table) {
-    try {
-      String[] splitNames = dequalify(table);
+  /**
+   * Verifies existence of table in the database.
+   *
+   * @param identifiers Identifiers that are used to build the table's name. Following
+   *                    variants are accepted:
+   *                    * (catalog, schema, table)
+   *                    * (schema, table)
+   *                    * (table)
+   *                    Return value of any combination is "undefined".
+   * @return True if given table exists
+   */
+  public boolean existTable(String ...identifiers) {
+    int index = 0;
+    String catalog = identifiers.length >= 3 ? identifiers[index++] : null;
+    String schema = identifiers.length >= 2 ? identifiers[index++] : null;
+    String table = identifiers[index];
 
+    try {
       DatabaseMetaData dbmd = connection.getMetaData();
-      ResultSet rs = dbmd.getTables(null, splitNames[0], splitNames[1], null);
+      ResultSet rs = dbmd.getTables(catalog, schema, table, null);
 
       if (rs.next()) {
         return true;
@@ -354,36 +445,6 @@ public class GenericJdbcExecutor {
     }
   }
 
-  /*
-   * If not qualified already, the name will be added with the qualifier.
-   * If qualified already, old qualifier will be replaced.
-   */
-  public String qualify(String name, String qualifier) {
-    String[] splits = dequalify(name);
-    return qualifier + "." + splits[1];
-  }
-
-  /*
-   * Split the name into a qualifier (element 0) and a base (element 1).
-   */
-  public String[] dequalify(String name) {
-    String qualifier;
-    String base;
-    int dot = name.indexOf(".");
-    if (dot != -1) {
-      qualifier = name.substring(0, dot);
-      base = name.substring(dot + 1);
-    } else {
-      qualifier = null;
-      base = name;
-    }
-    return new String[] {qualifier, base};
-  }
-
-  public String delimitIdentifier(String name) {
-    return name;
-  }
-
   public void close() {
     try {
       connection.close();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 20fabf6..9d8e4e7 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -70,11 +70,11 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
   public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
     executor = new GenericJdbcExecutor(linkConfig);
 
-    String schemaName = fromJobConfig.fromJobConfig.tableName;
-    if(schemaName == null) {
+    String schemaName;
+    if(fromJobConfig.fromJobConfig.tableName != null) {
+      schemaName = executor.encloseIdentifiers(fromJobConfig.fromJobConfig.schemaName, fromJobConfig.fromJobConfig.tableName);
+    } else {
       schemaName = "Query";
-    } else if(fromJobConfig.fromJobConfig.schemaName != null) {
-      schemaName = fromJobConfig.fromJobConfig.schemaName + "." + schemaName;
     }
 
     Schema schema = new Schema(schemaName);
@@ -135,11 +135,11 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
     String partitionColumnName = jobConf.fromJobConfig.partitionColumn;
     // If it's not specified, we can use primary key of given table (if it's table based import)
     if (StringUtils.isBlank(partitionColumnName) && tableImport) {
-        partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.tableName);
+        partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName);
     }
     // If we don't have partition column name, we will error out
     if (partitionColumnName != null) {
-      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, partitionColumnName);
+      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, executor.encloseIdentifier(partitionColumnName));
     } else {
       throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
     }
@@ -148,13 +148,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
     // From fragment for subsequent queries
     String fromFragment;
     if(tableImport) {
-      String tableName = jobConf.fromJobConfig.tableName;
-      String schemaName = jobConf.fromJobConfig.schemaName;
-
-      fromFragment = executor.delimitIdentifier(tableName);
-      if(schemaName != null) {
-        fromFragment = executor.delimitIdentifier(schemaName) + "." + fromFragment;
-      }
+      fromFragment = executor.encloseIdentifiers(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName);
     } else {
       sb.setLength(0);
       sb.append("(");
@@ -169,7 +163,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
     if(incrementalImport) {
       sb.setLength(0);
       sb.append("SELECT ");
-      sb.append("MAX(").append(jobConf.incrementalRead.checkColumn).append(") ");
+      sb.append("MAX(").append(executor.encloseIdentifier(jobConf.incrementalRead.checkColumn)).append(") ");
       sb.append("FROM ");
       sb.append(fromFragment);
 
@@ -199,15 +193,15 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
     if (minMaxQuery == null) {
       sb.setLength(0);
       sb.append("SELECT ");
-      sb.append("MIN(").append(partitionColumnName).append("), ");
-      sb.append("MAX(").append(partitionColumnName).append(") ");
+      sb.append("MIN(").append(executor.encloseIdentifier(partitionColumnName)).append("), ");
+      sb.append("MAX(").append(executor.encloseIdentifier(partitionColumnName)).append(") ");
       sb.append("FROM ").append(fromFragment).append(" ");
 
       if(incrementalImport) {
         sb.append("WHERE ");
-        sb.append(jobConf.incrementalRead.checkColumn).append(" > ?");
+        sb.append(executor.encloseIdentifier(jobConf.incrementalRead.checkColumn)).append(" > ?");
         sb.append(" AND ");
-        sb.append(jobConf.incrementalRead.checkColumn).append(" <= ?");
+        sb.append(executor.encloseIdentifier(jobConf.incrementalRead.checkColumn)).append(" <= ?");
       }
 
       minMaxQuery = sb.toString();
@@ -263,16 +257,12 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
     String tableSql = fromJobConfig.fromJobConfig.sql;
     String tableColumns = fromJobConfig.fromJobConfig.columns;
 
-    if (tableName != null && tableSql != null) {
-      // when both fromTable name and fromTable sql are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
-    } else if (tableName != null) {
-      // when fromTable name is specified:
+    // Assertion that should be true based on our validations
+    assert (tableName != null && tableSql == null) || (tableName == null && tableSql != null);
 
+    if (tableName != null) {
       // For databases that support schemas (IE: postgresql).
-      String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+      String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
 
       if (tableColumns == null) {
         StringBuilder builder = new StringBuilder();
@@ -282,10 +272,8 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
         builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
         dataSql = builder.toString();
 
-        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
-        fieldNames = StringUtils.join(queryColumns, ',');
-
+        String[] queryColumns = executor.getQueryColumns(dataSql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+        fieldNames = executor.columnList(queryColumns);
       } else {
         StringBuilder builder = new StringBuilder();
         builder.append("SELECT ");
@@ -298,29 +286,18 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
 
         fieldNames = tableColumns;
       }
-    } else if (tableSql != null) {
-      // when fromTable sql is specified:
-
+    } else {
       assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
 
       if (tableColumns == null) {
         dataSql = tableSql;
 
-        String[] queryColumns = executor.getQueryColumns(dataSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
-        fieldNames = StringUtils.join(queryColumns, ',');
-
+        String[] queryColumns = executor.getQueryColumns(dataSql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+        fieldNames = executor.columnList(queryColumns);
       } else {
-        String[] columns = StringUtils.split(tableColumns, ',');
         StringBuilder builder = new StringBuilder();
         builder.append("SELECT ");
-        builder.append(executor.qualify(
-            columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
-        for (int i = 1; i < columns.length; i++) {
-          builder.append(",");
-          builder.append(executor.qualify(
-              columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
-        }
+        builder.append(tableColumns);
         builder.append(" FROM ");
         builder.append("(");
         builder.append(tableSql);
@@ -330,10 +307,6 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
 
         fieldNames = tableColumns;
       }
-    } else {
-      // when neither are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
     }
 
     LOG.info("Using dataSql: " + dataSql);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
index 4688de3..f97e731 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
@@ -63,17 +63,12 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration, ToJ
   public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
     executor = new GenericJdbcExecutor(linkConfig);
 
-    String schemaName = toJobConfig.toJobConfig.tableName;
-
+    String schemaName = executor.encloseIdentifiers(toJobConfig.toJobConfig.schemaName, toJobConfig.toJobConfig.tableName);
     if (schemaName == null) {
       throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
           "Table name extraction not supported yet.");
     }
 
-    if(toJobConfig.toJobConfig.schemaName != null) {
-      schemaName = toJobConfig.toJobConfig.schemaName + "." + schemaName;
-    }
-
     Schema schema = new Schema(schemaName);
     ResultSet rs = null;
     ResultSetMetaData rsmt = null;
@@ -147,10 +142,7 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration, ToJ
 
       // For databases that support schemas (IE: postgresql).
       final String tableInUse = stageEnabled ? stageTableName : tableName;
-      String fullTableName = (schemaName == null) ?
-        executor.delimitIdentifier(tableInUse) :
-        executor.delimitIdentifier(schemaName) +
-          "." + executor.delimitIdentifier(tableInUse);
+      String fullTableName = executor.encloseIdentifiers(schemaName, tableInUse);
 
       if (tableColumns == null) {
         String[] columns = executor.getQueryColumns("SELECT * FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfiguration.java
index ed55bff..ceb6e6d 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfiguration.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfiguration.java
@@ -28,7 +28,10 @@ public class LinkConfiguration {
 
   @Config public LinkConfig linkConfig;
 
+  @Config public SqlDialect dialect;
+
   public LinkConfiguration() {
     linkConfig = new LinkConfig();
+    dialect = new SqlDialect();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/SqlDialect.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/SqlDialect.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/SqlDialect.java
new file mode 100644
index 0000000..12defa3
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/SqlDialect.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ * Enables user to configure various aspects of the way the JDBC Connector generates
+ * SQL queries.
+ */
+@ConfigClass
+public class SqlDialect {
+  /**
+   * Character(s) that we should use to escape SQL identifiers (tables, column names, ...)
+   */
+  @Input(size = 5)  public String identifierEnclose;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
index 52bf631..73fa308 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
@@ -122,3 +122,10 @@ incrementalRead.checkColumn.help = Column that is checked during incremental rea
 incrementalRead.lastValue.label = Last value
 incrementalRead.lastValue.help = Last read value, fetch will resume with higher values
 
+
+# Dialect
+dialect.label = SQL Dialect
+dialect.help = Dialect that should be used for generated queries
+
+dialect.identifierEnclose.label = Identifier enclose
+dialect.identifierEnclose.help = Character(s) that should be used to enclose table name, schema, column names, ...

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
index 22c9e15..a482ac4 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
@@ -18,46 +18,60 @@
 package org.apache.sqoop.connector.jdbc;
 
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.LinkConfig;
 import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 public class GenericJdbcExecutorTest {
   private final String table;
   private final String emptyTable;
-  private final GenericJdbcExecutor executor;
+  private final String schema;
+  private GenericJdbcExecutor executor;
 
-  private static final int START = -50;
-  private static final int NUMBER_OF_ROWS = 974;
+  private static final int START = -10;
+  private static final int NUMBER_OF_ROWS = 20;
 
   public GenericJdbcExecutorTest() {
     table = getClass().getSimpleName().toUpperCase();
     emptyTable = table + "_EMPTY";
-    executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
+    schema = table + "_SCHEMA";
   }
 
   @BeforeMethod(alwaysRun = true)
   public void setUp() {
-    if(executor.existTable(emptyTable)) {
-      executor.executeUpdate("DROP TABLE " + emptyTable);
-    }
-    executor.executeUpdate("CREATE TABLE "
-      + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
-
-    if(executor.existTable(table)) {
-      executor.executeUpdate("DROP TABLE " + table);
-    }
-    executor.executeUpdate("CREATE TABLE "
-      + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+    executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
+    executor.executeUpdate("CREATE SCHEMA " + executor.encloseIdentifier(schema));
+    executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(emptyTable )+ "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+    executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(table) + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+    executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifiers(schema, table) + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
 
     for (int i = 0; i < NUMBER_OF_ROWS; i++) {
       int value = START + i;
-      String sql = "INSERT INTO " + table
-        + " VALUES(" + value + ", '" + value + "')";
-      executor.executeUpdate(sql);
+      executor.executeUpdate("INSERT INTO " + executor.encloseIdentifier(table) + " VALUES(" + value + ", '" + value + "')");
+      executor.executeUpdate("INSERT INTO " + executor.encloseIdentifiers(schema, table) + " VALUES(" + value + ", '" + value + "')");
+    }
+  }
+
+  @AfterMethod
+  public void tearDown() throws SQLException {
+    executor.close();
+    try {
+      DriverManager.getConnection(GenericJdbcTestConstants.URL_DROP);
+    } catch(SQLException e) {
+      // Code 8006 means that the database has been successfully drooped
+      if(e.getErrorCode() != 45000 && e.getNextException().getErrorCode() == 8006) {
+        throw e;
+      }
+
     }
   }
 
@@ -71,6 +85,44 @@ public class GenericJdbcExecutorTest {
   }
 
   @Test
+  public void testGetPrimaryKey() {
+    assertNull(executor.getPrimaryKey("non-existing-table"));
+    assertNull(executor.getPrimaryKey("non-existing-schema", "non-existing-table"));
+    assertNull(executor.getPrimaryKey("non-existing-catalog", "non-existing-schema", "non-existing-table"));
+
+    assertEquals(executor.getPrimaryKey(table), "ICOL");
+    assertEquals(executor.getPrimaryKey(schema, table), "ICOL");
+  }
+
+  @Test
+  public void testExistsTable() {
+    assertFalse(executor.existTable("non-existing-table"));
+    assertFalse(executor.existTable("non-existing-schema", "non-existing-table"));
+    assertFalse(executor.existTable("non-existing-catalog", "non-existing-schema", "non-existing-table"));
+
+    assertTrue(executor.existTable(table));
+    assertTrue(executor.existTable(schema, table));
+  }
+
+  @Test
+  public void testEncloseIdentifier() {
+    assertEquals(executor.encloseIdentifier("a"), "\"a\"");
+  }
+
+  @Test
+  public void testEncloseIdentifiers() {
+    assertEquals(executor.encloseIdentifiers("a"), "\"a\"");
+    assertEquals(executor.encloseIdentifiers(null, "a"), "\"a\"");
+    assertEquals(executor.encloseIdentifiers("a", "b"), "\"a\".\"b\"");
+  }
+
+  @Test
+  public void testColumnList() {
+    assertEquals(executor.columnList("a"), "\"a\"");
+    assertEquals(executor.columnList("a", "b"), "\"a\", \"b\"");
+  }
+
+  @Test
   public void testDeleteTableData() throws Exception {
     executor.deleteTableData(table);
     assertEquals(0, executor.getTableRowCount(table),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
index 8a5dba4..e16c631 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
@@ -32,6 +32,11 @@ public class GenericJdbcTestConstants {
   public static final String URL = "jdbc:derby:memory:TESTDB;create=true";
 
   /**
+   * URL to drop the in-memory database
+   */
+  public static final String URL_DROP = "jdbc:derby:memory:TESTDB;drop=true";
+
+  /**
    * Test link configuration
    */
   public static final LinkConfiguration LINK_CONFIGURATION = new LinkConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index 77ac9c3..264cadf 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -63,12 +63,12 @@ public class TestExtractor {
 
     if (!executor.existTable(tableName)) {
       executor.executeUpdate("CREATE TABLE "
-          + executor.delimitIdentifier(tableName)
+          + executor.encloseIdentifier(tableName)
           + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = START + i;
-        String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
+        String sql = "INSERT INTO " + executor.encloseIdentifier(tableName)
             + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
         executor.executeUpdate(sql);
       }
@@ -93,7 +93,7 @@ public class TestExtractor {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT * FROM " + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}");
+        "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
 
     GenericJdbcPartition partition;
 
@@ -133,7 +133,7 @@ public class TestExtractor {
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
-            + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+            + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
 
     GenericJdbcPartition partition;
 
@@ -174,7 +174,7 @@ public class TestExtractor {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
-            + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+            + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
 
@@ -192,12 +192,12 @@ public class TestExtractor {
   public void testNullValueExtracted() throws Exception {
 
     if (!executor.existTable(nullDataTableName)) {
-      executor.executeUpdate("CREATE TABLE " + executor.delimitIdentifier(nullDataTableName)
+      executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(nullDataTableName)
           + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = i;
-        String sql = "INSERT INTO " + executor.delimitIdentifier(nullDataTableName) + " VALUES(" + value + ",null,null,null)";
+        String sql = "INSERT INTO " + executor.encloseIdentifier(nullDataTableName) + " VALUES(" + value + ",null,null,null)";
         executor.executeUpdate(sql);
       }
     }
@@ -210,7 +210,7 @@ public class TestExtractor {
 
     FromJobConfiguration jobConfig = new FromJobConfiguration();
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT * FROM " + executor.delimitIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
+        "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyNullDataWriter writer = new DummyNullDataWriter();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index 6ae6f90..31fafb1 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -63,9 +63,9 @@ public class TestFromInitializer {
   public void setUp() {
     executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifier(schemaName) + "." + executor.encloseIdentifier(tableName);
     if (!executor.existTable(tableName)) {
-      executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
+      executor.executeUpdate("CREATE SCHEMA " + executor.encloseIdentifier(schemaName));
       executor.executeUpdate("CREATE TABLE "
           + fullTableName
           + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
@@ -78,7 +78,7 @@ public class TestFromInitializer {
       }
     }
 
-    fullTableName = executor.delimitIdentifier(schemalessTableName);
+    fullTableName = executor.encloseIdentifier(schemalessTableName);
     if (!executor.existTable(schemalessTableName)) {
       executor.executeUpdate("CREATE TABLE "
           + fullTableName
@@ -130,10 +130,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
-            + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -159,9 +158,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -189,9 +188,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(1),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -218,10 +217,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
-            + " WHERE ${CONDITIONS}",
+        "SELECT ICOL,VCOL FROM " + executor.encloseIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
         tableColumns,
-        "ICOL",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -246,10 +244,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
-            + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "DCOL",
+        "SELECT * FROM " + schemalessTableName  + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"DCOL\"",
         String.valueOf(Types.DOUBLE),
         String.valueOf((double)START),
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
@@ -276,9 +273,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf((START+NUMBER_OF_ROWS-1)));
@@ -306,9 +303,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(1),
         String.valueOf((START+NUMBER_OF_ROWS-1)));
@@ -335,11 +332,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
-            + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
-            + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
+        "SELECT ICOL,VCOL FROM (" + schemalessTableSql + ") SQOOP_SUBQUERY_ALIAS",
         tableColumns,
-        "DCOL",
+        "\"DCOL\"",
         String.valueOf(Types.DOUBLE),
         String.valueOf((double)START),
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
@@ -351,7 +346,7 @@ public class TestFromInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -366,10 +361,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + fullTableName
-            + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "ICOL",
+        "SELECT * FROM " + fullTableName + " WHERE ${CONDITIONS}",
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -381,7 +375,7 @@ public class TestFromInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -397,10 +391,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT ICOL,VCOL FROM " + fullTableName
-            + " WHERE ${CONDITIONS}",
+        "SELECT ICOL,VCOL FROM " + fullTableName + " WHERE ${CONDITIONS}",
         tableColumns,
-        "ICOL",
+        "\"ICOL\"",
         String.valueOf(Types.INTEGER),
         String.valueOf(START),
         String.valueOf(START+NUMBER_OF_ROWS-1));
@@ -412,8 +405,6 @@ public class TestFromInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
     jobConfig.fromJobConfig.schemaName = schemaName;
@@ -428,10 +419,9 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT * FROM " + fullTableName
-            + " WHERE ${CONDITIONS}",
-        "ICOL,DCOL,VCOL",
-        "DCOL",
+        tableSql,
+        "\"ICOL\", \"DCOL\", \"VCOL\"",
+        "\"DCOL\"",
         String.valueOf(Types.DOUBLE),
         String.valueOf((double)START),
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
@@ -443,6 +433,8 @@ public class TestFromInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
+    String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
+
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
     jobConfig.fromJobConfig.schemaName = schemaName;
@@ -456,7 +448,7 @@ public class TestFromInitializer {
     Initializer initializer = new GenericJdbcFromInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
     Schema schema = initializer.getSchema(initializerContext, linkConfig, jobConfig);
-    assertEquals(getSchema(jobConfig.fromJobConfig.schemaName + "." + tableName), schema);
+    assertEquals(getSchema(fullTableName), schema);
   }
 
   @Test
@@ -487,8 +479,6 @@ public class TestFromInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
     jobConfig.fromJobConfig.schemaName = schemaName;
@@ -504,32 +494,32 @@ public class TestFromInitializer {
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
-        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
-            + "(SELECT * FROM " + fullTableName
-            + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS",
+        "SELECT ICOL,VCOL FROM (" + tableSql + ") SQOOP_SUBQUERY_ALIAS",
         tableColumns,
-        "DCOL",
+        "\"DCOL\"",
         String.valueOf(Types.DOUBLE),
         String.valueOf((double)START),
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
   }
 
-  private void verifyResult(MutableContext context,
-      String dataSql, String fieldNames,
-      String partitionColumnName, String partitionColumnType,
-      String partitionMinValue, String partitionMaxValue) {
-    assertEquals(dataSql, context.getString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL));
-    assertEquals(fieldNames, context.getString(
-        Constants.JOB_ETL_FIELD_NAMES));
-
-    assertEquals(partitionColumnName, context.getString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME));
-    assertEquals(partitionColumnType, context.getString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE));
-    assertEquals(partitionMinValue, context.getString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE));
-    assertEquals(partitionMaxValue, context.getString(
-        GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE));
+  /**
+   * Asserts expected content inside the generated context.
+   *
+   * @param context Context that we're validating against
+   * @param dataSql Expected SQL fragment
+   * @param fieldNames All detected field names, they need to be properly escaped
+   * @param partitionColumnName Partition column name, it needs to be properly escaped
+   * @param partitionColumnType Partition column type
+   * @param partitionMinValue Minimal value for partitioning
+   * @param partitionMaxValue Maximal value for partitioning
+   */
+  private void verifyResult(MutableContext context, String dataSql, String fieldNames, String partitionColumnName, String partitionColumnType, String partitionMinValue, String partitionMaxValue) {
+    assertEquals(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL), dataSql);
+    assertEquals(context.getString(Constants.JOB_ETL_FIELD_NAMES), fieldNames);
+
+    assertEquals(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME), partitionColumnName);
+    assertEquals(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE), partitionColumnType);
+    assertEquals(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE), partitionMinValue);
+    assertEquals(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE), partitionMaxValue);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
index f192c22..c69ec03 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
@@ -72,8 +72,7 @@ public class TestLoader {
     executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
 
     if (!executor.existTable(tableName)) {
-      executor.executeUpdate("CREATE TABLE "
-          + executor.delimitIdentifier(tableName)
+      executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(tableName)
           + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE, DATETIMECOL TIMESTAMP, TIMECOL TIME)");
     } else {
       executor.deleteTableData(tableName);
@@ -98,7 +97,7 @@ public class TestLoader {
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
-        "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?,?,?,?)");
+        "INSERT INTO " + executor.encloseIdentifier(tableName) + " VALUES (?,?,?,?,?,?)");
 
 
     Loader loader = new GenericJdbcLoader();
@@ -112,7 +111,7 @@ public class TestLoader {
 
     int index = START;
     ResultSet rs = executor.executeQuery("SELECT * FROM "
-        + executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
+        + executor.encloseIdentifier(tableName) + " ORDER BY ICOL");
     while (rs.next()) {
       assertEquals(index, rs.getObject(1));
       assertEquals((double) index, rs.getObject(2));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
index 1c65fc3..7e36666 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
@@ -61,13 +61,13 @@ public class TestToInitializer {
   public void setUp() {
     executor = new GenericJdbcExecutor(GenericJdbcTestConstants.LINK_CONFIGURATION);
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifier(schemaName) + "." + executor.encloseIdentifier(tableName);
     if (!executor.existTable(tableName)) {
-      executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName));
+      executor.executeUpdate("CREATE SCHEMA " + executor.encloseIdentifier(schemaName));
       executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
     }
 
-    fullTableName = executor.delimitIdentifier(schemalessTableName);
+    fullTableName = executor.encloseIdentifier(schemalessTableName);
     if (!executor.existTable(schemalessTableName)) {
       executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
     }
@@ -84,7 +84,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemalessTableName);
+    String fullTableName = executor.encloseIdentifier(schemalessTableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -106,7 +106,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemalessTableName);
+    String fullTableName = executor.encloseIdentifier(schemalessTableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -140,7 +140,7 @@ public class TestToInitializer {
     Initializer initializer = new GenericJdbcToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
-    verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
+    verifyResult(context, schemalessTableSql);
   }
 
   @Test
@@ -149,7 +149,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -172,7 +172,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+    String fullTableName = executor.encloseIdentifiers(schemaName, tableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -208,7 +208,7 @@ public class TestToInitializer {
     Initializer initializer = new GenericJdbcToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
-    verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
+    verifyResult(context, tableSql);
   }
 
   private void verifyResult(MutableContext context, String dataSql) {
@@ -255,7 +255,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+    String fullStageTableName = executor.encloseIdentifier(stageTableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -333,7 +333,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+    String fullStageTableName = executor.encloseIdentifier(stageTableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
@@ -359,7 +359,7 @@ public class TestToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    String fullStageTableName = executor.delimitIdentifier(stageTableName);
+    String fullStageTableName = executor.encloseIdentifier(stageTableName);
 
     linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
     linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
index c84e799..0bf62d5 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
@@ -132,13 +132,13 @@ abstract public class ConnectorTestCase extends TomcatTestCase {
 
   protected void fillRdbmsFromConfig(MJob job, String partitionColumn) {
     MConfigList fromConfig = job.getFromJobConfig();
-    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
-    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName(partitionColumn));
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(getTableName().getTableName());
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
   }
 
   protected void fillRdbmsToConfig(MJob job) {
     MConfigList toConfig = job.getToJobConfig();
-    toConfig.getStringInput("toJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
+    toConfig.getStringInput("toJobConfig.tableName").setValue(getTableName().getTableName());
   }
 
   protected void fillHdfsLink(MLink link) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
index dac6db7..aa4fdde 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
@@ -17,15 +17,11 @@
  */
 package org.apache.sqoop.integration.connector.jdbc.generic;
 
-import static org.testng.Assert.assertTrue;
-
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.test.testcases.ConnectorTestCase;
 import org.testng.annotations.Test;
 
@@ -174,7 +170,7 @@ public class FromRDBMSToHDFSTest extends ConnectorTestCase {
     MConfigList configs = job.getFromJobConfig();
     configs.getStringInput("fromJobConfig.sql").setValue("SELECT " + provider.escapeColumnName("id")
         + " FROM " + provider.escapeTableName(getTableName().getTableName()) + " WHERE ${CONDITIONS}");
-    configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+    configs.getStringInput("fromJobConfig.partitionColumn").setValue("id");
     fillHdfsToConfig(job, ToFormat.TEXT_FILE);
 
     saveJob(job);
@@ -217,7 +213,7 @@ public class FromRDBMSToHDFSTest extends ConnectorTestCase {
         "SELECT " + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("i") + ", "
             + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("j")
             + " FROM " + provider.escapeTableName(getTableName().getTableName()) + " WHERE ${CONDITIONS}");
-    configs.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
+    configs.getStringInput("fromJobConfig.partitionColumn").setValue("id");
     configs.getStringInput("fromJobConfig.boundaryQuery").setValue(
         "SELECT MIN(" + partitionColumn + "), MAX(" + partitionColumn + ") FROM "
             + provider.escapeTableName(getTableName().getTableName()));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
index 66c016d..a7be9c6 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
@@ -95,7 +95,7 @@ public class IncrementalReadTest extends ConnectorTestCase implements ITest {
     // Set the rdbms "FROM" config
     fillRdbmsFromConfig(job, "id");
     MConfigList fromConfig = job.getFromJobConfig();
-    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(checkColumn);
     fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
 
     // Fill hdfs "TO" config
@@ -149,8 +149,8 @@ public class IncrementalReadTest extends ConnectorTestCase implements ITest {
     // Set the rdbms "FROM" config
     MConfigList fromConfig = job.getFromJobConfig();
     fromConfig.getStringInput("fromJobConfig.sql").setValue(query);
-    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
-    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
+    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(checkColumn);
     fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
 
     // Fill hdfs "TO" config

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3362fbb1/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index e9c4543..0008cac 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -65,7 +65,7 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
     // fill rdbms "TO" config here
     fillRdbmsToConfig(job);
     MConfigList configs = job.getToJobConfig();
-    configs.getStringInput("toJobConfig.stageTableName").setValue(provider.escapeTableName(stageTableName.getTableName()));
+    configs.getStringInput("toJobConfig.stageTableName").setValue(stageTableName.getTableName());
 
     // driver config
     MConfigList driverConfig = job.getDriverConfig();