You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by jm...@apache.org on 2011/08/05 23:36:39 UTC

svn commit: r1154386 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/manager/PostgresqlManager.java test/com/cloudera/sqoop/manager/PostgresqlTest.java

Author: jmhsieh
Date: Fri Aug  5 21:36:38 2011
New Revision: 1154386

URL: http://svn.apache.org/viewvc?rev=1154386&view=rev
Log:
SQOOP-303/150. Use catalog views for PostgressqlManager (Bilung Lee)

Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java?rev=1154386&r1=1154385&r2=1154386&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java Fri Aug  5 21:36:38 2011
@@ -30,7 +30,7 @@ import com.cloudera.sqoop.util.ImportExc
 /**
  * Manages connections to Postgresql databases.
  */
-public class PostgresqlManager extends GenericJdbcManager {
+public class PostgresqlManager extends CatalogQueryManager {
 
   public static final Log LOG = LogFactory.getLog(
       PostgresqlManager.class.getName());
@@ -101,14 +101,52 @@ public class PostgresqlManager extends G
   }
 
   @Override
-  public String getPrimaryKey(String tableName) {
-    // The table name is already escaped
-    return super.getPrimaryKey(tableName);
+  public boolean supportsStagingForExport() {
+    return true;
   }
 
   @Override
-  public boolean supportsStagingForExport() {
-    return true;
+  protected String getListDatabasesQuery() {
+    return
+      "SELECT DATNAME FROM PG_CATALOG.PG_DATABASE";
+  }
+
+  @Override
+  protected String getListTablesQuery() {
+    return
+      "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES "
+    + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())";
+  }
+
+  @Override
+  protected String getListColumnsQuery(String tableName) {
+    return
+      "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch,"
+    + "  PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col "
+    + "WHERE sch.OID = tab.RELNAMESPACE "
+    + "  AND tab.OID = col.ATTRELID "
+    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+    + "  AND col.ATTNUM >= 1";
+  }
+
+  @Override
+  protected String getPrimaryKeyQuery(String tableName) {
+    return
+      "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch, "
+    + "  PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col, "
+    + "  PG_CATALOG.PG_INDEX ind "
+    + "WHERE sch.OID = tab.RELNAMESPACE "
+    + "  AND tab.OID = col.ATTRELID "
+    + "  AND tab.OID = ind.INDRELID "
+    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+    + "  AND col.ATTNUM = ANY(ind.INDKEY) "
+    + "  AND ind.INDISPRIMARY";
+  }
+
+  private String escapeLiteral(String literal) {
+    return literal.replace("'", "''");
   }
 }
 

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java?rev=1154386&r1=1154385&r2=1154386&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java Fri Aug  5 21:36:38 2011
@@ -27,12 +27,14 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import com.cloudera.sqoop.SqoopOptions;
@@ -89,6 +91,7 @@ public class PostgresqlTest extends Impo
   static final String DATABASE_USER = "sqooptest";
   static final String DATABASE_NAME = "sqooptest";
   static final String TABLE_NAME = "EMPLOYEES_PG";
+  static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
   static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
 
   @Override
@@ -102,7 +105,14 @@ public class PostgresqlTest extends Impo
 
     LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
 
-    SqoopOptions options = new SqoopOptions(CONNECT_STRING, TABLE_NAME);
+    setUpData(TABLE_NAME);
+    setUpData(SPECIAL_TABLE_NAME);
+
+    LOG.debug("setUp complete.");
+  }
+
+  public void setUpData(String tableName) {
+    SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
     options.setUsername(DATABASE_USER);
 
     ConnManager manager = null;
@@ -121,15 +131,15 @@ public class PostgresqlTest extends Impo
         // Try to remove the table first. DROP TABLE IF EXISTS didn't
         // get added until pg 8.3, so we just use "DROP TABLE" and ignore
         // any exception here if one occurs.
-        st.executeUpdate("DROP TABLE " + manager.escapeTableName(TABLE_NAME));
+        st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName));
       } catch (SQLException e) {
-        LOG.info("Couldn't drop table " + TABLE_NAME + " (ok)");
+        LOG.info("Couldn't drop table " + tableName + " (ok)");
         LOG.info(e.toString());
         // Now we need to reset the transaction.
         connection.rollback();
       }
 
-      st.executeUpdate("CREATE TABLE " + manager.escapeTableName(TABLE_NAME)
+      st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName)
           + " ("
           + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
           + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
@@ -137,11 +147,11 @@ public class PostgresqlTest extends Impo
           + manager.escapeColName("salary") + " FLOAT, "
           + manager.escapeColName("dept") + " VARCHAR(32))");
 
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
           + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')");
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
           + " VALUES(2,'Bob','2009-04-20',400.00,'sales')");
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
           + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')");
       connection.commit();
     } catch (SQLException sqlE) {
@@ -166,13 +176,13 @@ public class PostgresqlTest extends Impo
   }
 
 
-  private String [] getArgv(boolean isDirect) {
+  private String [] getArgv(boolean isDirect, String tableName) {
     ArrayList<String> args = new ArrayList<String>();
 
     CommonArgs.addHadoopFlags(args);
 
     args.add("--table");
-    args.add(TABLE_NAME);
+    args.add(tableName);
     args.add("--warehouse-dir");
     args.add(getWarehouseDir());
     args.add("--connect");
@@ -189,11 +199,11 @@ public class PostgresqlTest extends Impo
     return args.toArray(new String[0]);
   }
 
-  private void doImportAndVerify(boolean isDirect, String [] expectedResults)
-      throws IOException {
+  private void doImportAndVerify(boolean isDirect, String [] expectedResults,
+      String tableName) throws IOException {
 
     Path warehousePath = new Path(this.getWarehouseDir());
-    Path tablePath = new Path(warehousePath, TABLE_NAME);
+    Path tablePath = new Path(warehousePath, tableName);
 
     Path filePath;
     if (isDirect) {
@@ -208,7 +218,7 @@ public class PostgresqlTest extends Impo
       FileListing.recursiveDeleteDir(tableFile);
     }
 
-    String [] argv = getArgv(isDirect);
+    String [] argv = getArgv(isDirect, tableName);
     try {
       runImport(argv);
     } catch (IOException ioe) {
@@ -242,7 +252,7 @@ public class PostgresqlTest extends Impo
       "3,Fred,2009-01-23,15.0,marketing",
     };
 
-    doImportAndVerify(false, expectedResults);
+    doImportAndVerify(false, expectedResults, TABLE_NAME);
   }
 
   @Test
@@ -252,6 +262,29 @@ public class PostgresqlTest extends Impo
       "3,Fred,2009-01-23,15,marketing",
     };
 
-    doImportAndVerify(true, expectedResults);
+    doImportAndVerify(true, expectedResults, TABLE_NAME);
+  }
+
+  @Test
+  public void testListTables() throws IOException {
+    SqoopOptions options = new SqoopOptions(new Configuration());
+    options.setConnectString(CONNECT_STRING);
+    options.setUsername(DATABASE_USER);
+
+    ConnManager mgr = new PostgresqlManager(options);
+    String[] tables = mgr.listTables();
+    Arrays.sort(tables);
+    assertTrue(TABLE_NAME + " is not found!",
+        Arrays.binarySearch(tables, TABLE_NAME) >= 0);
+  }
+
+  @Test
+  public void testTableNameWithSpecialCharacter() throws IOException {
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400.0,sales",
+        "3,Fred,2009-01-23,15.0,marketing",
+    };
+
+    doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
   }
 }