You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by jm...@apache.org on 2011/08/05 23:36:39 UTC
svn commit: r1154386 - in /incubator/sqoop/trunk/src:
java/com/cloudera/sqoop/manager/PostgresqlManager.java
test/com/cloudera/sqoop/manager/PostgresqlTest.java
Author: jmhsieh
Date: Fri Aug 5 21:36:38 2011
New Revision: 1154386
URL: http://svn.apache.org/viewvc?rev=1154386&view=rev
Log:
SQOOP-303/150. Use catalog views for PostgressqlManager (Bilung Lee)
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java?rev=1154386&r1=1154385&r2=1154386&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java Fri Aug 5 21:36:38 2011
@@ -30,7 +30,7 @@ import com.cloudera.sqoop.util.ImportExc
/**
* Manages connections to Postgresql databases.
*/
-public class PostgresqlManager extends GenericJdbcManager {
+public class PostgresqlManager extends CatalogQueryManager {
public static final Log LOG = LogFactory.getLog(
PostgresqlManager.class.getName());
@@ -101,14 +101,52 @@ public class PostgresqlManager extends G
}
@Override
- public String getPrimaryKey(String tableName) {
- // The table name is already escaped
- return super.getPrimaryKey(tableName);
+ public boolean supportsStagingForExport() {
+ return true;
}
@Override
- public boolean supportsStagingForExport() {
- return true;
+ protected String getListDatabasesQuery() {
+ return
+ "SELECT DATNAME FROM PG_CATALOG.PG_DATABASE";
+ }
+
+ @Override
+ protected String getListTablesQuery() {
+ return
+ "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES "
+ + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())";
+ }
+
+ @Override
+ protected String getListColumnsQuery(String tableName) {
+ return
+ "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch,"
+ + " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col "
+ + "WHERE sch.OID = tab.RELNAMESPACE "
+ + " AND tab.OID = col.ATTRELID "
+ + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+ + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+ + " AND col.ATTNUM >= 1";
+ }
+
+ @Override
+ protected String getPrimaryKeyQuery(String tableName) {
+ return
+ "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch, "
+ + " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col, "
+ + " PG_CATALOG.PG_INDEX ind "
+ + "WHERE sch.OID = tab.RELNAMESPACE "
+ + " AND tab.OID = col.ATTRELID "
+ + " AND tab.OID = ind.INDRELID "
+ + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+ + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+ + " AND col.ATTNUM = ANY(ind.INDKEY) "
+ + " AND ind.INDISPRIMARY";
+ }
+
+ private String escapeLiteral(String literal) {
+ return literal.replace("'", "''");
}
}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java?rev=1154386&r1=1154385&r2=1154386&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java Fri Aug 5 21:36:38 2011
@@ -27,12 +27,14 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import com.cloudera.sqoop.SqoopOptions;
@@ -89,6 +91,7 @@ public class PostgresqlTest extends Impo
static final String DATABASE_USER = "sqooptest";
static final String DATABASE_NAME = "sqooptest";
static final String TABLE_NAME = "EMPLOYEES_PG";
+ static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
@Override
@@ -102,7 +105,14 @@ public class PostgresqlTest extends Impo
LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
- SqoopOptions options = new SqoopOptions(CONNECT_STRING, TABLE_NAME);
+ setUpData(TABLE_NAME);
+ setUpData(SPECIAL_TABLE_NAME);
+
+ LOG.debug("setUp complete.");
+ }
+
+ public void setUpData(String tableName) {
+ SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
options.setUsername(DATABASE_USER);
ConnManager manager = null;
@@ -121,15 +131,15 @@ public class PostgresqlTest extends Impo
// Try to remove the table first. DROP TABLE IF EXISTS didn't
// get added until pg 8.3, so we just use "DROP TABLE" and ignore
// any exception here if one occurs.
- st.executeUpdate("DROP TABLE " + manager.escapeTableName(TABLE_NAME));
+ st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName));
} catch (SQLException e) {
- LOG.info("Couldn't drop table " + TABLE_NAME + " (ok)");
+ LOG.info("Couldn't drop table " + tableName + " (ok)");
LOG.info(e.toString());
// Now we need to reset the transaction.
connection.rollback();
}
- st.executeUpdate("CREATE TABLE " + manager.escapeTableName(TABLE_NAME)
+ st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName)
+ " ("
+ manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+ manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
@@ -137,11 +147,11 @@ public class PostgresqlTest extends Impo
+ manager.escapeColName("salary") + " FLOAT, "
+ manager.escapeColName("dept") + " VARCHAR(32))");
- st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+ st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
+ " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')");
- st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+ st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
+ " VALUES(2,'Bob','2009-04-20',400.00,'sales')");
- st.executeUpdate("INSERT INTO " + manager.escapeTableName(TABLE_NAME)
+ st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
+ " VALUES(3,'Fred','2009-01-23',15.00,'marketing')");
connection.commit();
} catch (SQLException sqlE) {
@@ -166,13 +176,13 @@ public class PostgresqlTest extends Impo
}
- private String [] getArgv(boolean isDirect) {
+ private String [] getArgv(boolean isDirect, String tableName) {
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--table");
- args.add(TABLE_NAME);
+ args.add(tableName);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
@@ -189,11 +199,11 @@ public class PostgresqlTest extends Impo
return args.toArray(new String[0]);
}
- private void doImportAndVerify(boolean isDirect, String [] expectedResults)
- throws IOException {
+ private void doImportAndVerify(boolean isDirect, String [] expectedResults,
+ String tableName) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
- Path tablePath = new Path(warehousePath, TABLE_NAME);
+ Path tablePath = new Path(warehousePath, tableName);
Path filePath;
if (isDirect) {
@@ -208,7 +218,7 @@ public class PostgresqlTest extends Impo
FileListing.recursiveDeleteDir(tableFile);
}
- String [] argv = getArgv(isDirect);
+ String [] argv = getArgv(isDirect, tableName);
try {
runImport(argv);
} catch (IOException ioe) {
@@ -242,7 +252,7 @@ public class PostgresqlTest extends Impo
"3,Fred,2009-01-23,15.0,marketing",
};
- doImportAndVerify(false, expectedResults);
+ doImportAndVerify(false, expectedResults, TABLE_NAME);
}
@Test
@@ -252,6 +262,29 @@ public class PostgresqlTest extends Impo
"3,Fred,2009-01-23,15,marketing",
};
- doImportAndVerify(true, expectedResults);
+ doImportAndVerify(true, expectedResults, TABLE_NAME);
+ }
+
+ @Test
+ public void testListTables() throws IOException {
+ SqoopOptions options = new SqoopOptions(new Configuration());
+ options.setConnectString(CONNECT_STRING);
+ options.setUsername(DATABASE_USER);
+
+ ConnManager mgr = new PostgresqlManager(options);
+ String[] tables = mgr.listTables();
+ Arrays.sort(tables);
+ assertTrue(TABLE_NAME + " is not found!",
+ Arrays.binarySearch(tables, TABLE_NAME) >= 0);
+ }
+
+ @Test
+ public void testTableNameWithSpecialCharacter() throws IOException {
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400.0,sales",
+ "3,Fred,2009-01-23,15.0,marketing",
+ };
+
+ doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
}
}