You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/07/09 14:17:47 UTC
svn commit: r792522 - in /hadoop/mapreduce/trunk: ./
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/
src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/
src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/
Author: tomwhite
Date: Thu Jul 9 12:17:46 2009
New Revision: 792522
URL: http://svn.apache.org/viewvc?rev=792522&view=rev
Log:
MAPREDUCE-685. Sqoop will fail with OutOfMemory on large tables using mysql. Contributed by Aaron Kimball.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 9 12:17:46 2009
@@ -159,3 +159,5 @@
MAPREDUCE-732. Removed spurious log statements in the node
blacklisting logic. (Sreekanth Ramakrishnan via yhemanth)
+ MAPREDUCE-685. Sqoop will fail with OutOfMemory on large tables
+ using mysql. (Aaron Kimball via tomwhite)
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Thu Jul 9 12:17:46 2009
@@ -67,6 +67,8 @@
* Execute a SQL statement to read the named set of columns from a table.
* If columns is null, all columns from the table are read. This is a local
* (non-parallelized) read of the table back to the current client.
+ * The client is responsible for calling ResultSet.close() when done with the
+ * returned ResultSet object.
*/
ResultSet readTable(String tableName, String [] columns) throws SQLException;
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Jul 9 12:17:46 2009
@@ -27,6 +27,7 @@
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
+import java.sql.SQLException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Thu Jul 9 12:17:46 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop.manager;
import java.io.IOException;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -53,11 +54,20 @@
}
@Override
+ protected String getColNamesQuery(String tableName) {
+ // Use mysql-specific hints and LIMIT to return fast
+ return "SELECT t.* FROM " + tableName + " AS t LIMIT 1";
+ }
+
+ @Override
public String[] listDatabases() {
// TODO(aaron): Add an automated unit test for this.
- ResultSet results = execute("SHOW DATABASES");
- if (null == results) {
+ ResultSet results;
+ try {
+ results = execute("SHOW DATABASES");
+ } catch (SQLException sqlE) {
+ LOG.error("Error executing statement: " + sqlE.toString());
return null;
}
@@ -72,6 +82,12 @@
} catch (SQLException sqlException) {
LOG.error("Error reading from database: " + sqlException.toString());
return null;
+ } finally {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Exception closing ResultSet: " + sqlE.toString());
+ }
}
}
@@ -98,5 +114,30 @@
// Then run the normal importTable() method.
super.importTable(tableName, jarFile, conf);
}
+
+ /**
+ * Executes an arbitrary SQL statement. Sets mysql-specific parameter
+ * to ensure the entire table is not buffered in RAM before reading
+ * any rows. A consequence of this is that every ResultSet returned
+ * by this method *MUST* be close()'d, or read to exhaustion before
+ * another query can be executed from this ConnManager instance.
+ *
+ * @param stmt The SQL statement to execute
+ * @return A ResultSet encapsulating the results or null on error
+ */
+ protected ResultSet execute(String stmt, Object... args) throws SQLException {
+ PreparedStatement statement = null;
+ statement = this.getConnection().prepareStatement(stmt,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+ if (null != args) {
+ for (int i = 0; i < args.length; i++) {
+ statement.setObject(i + 1, args[i]);
+ }
+ }
+
+ LOG.info("Executing SQL statement: " + stmt);
+ return statement.executeQuery();
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Thu Jul 9 12:17:46 2009
@@ -44,9 +44,6 @@
* ConnManager implementation for generic SQL-compliant database.
* This is an abstract class; it requires a database-specific
* ConnManager implementation to actually create the connection.
- *
- *
- *
*/
public abstract class SqlManager implements ConnManager {
@@ -63,12 +60,23 @@
this.options = opts;
}
+ /**
+ * @return the SQL query to use in getColumnNames() in case this logic must
+ * be tuned per-database, but the main extraction loop is still inheritable.
+ */
+ protected String getColNamesQuery(String tableName) {
+ return "SELECT t.* FROM " + tableName + " AS t";
+ }
+
@Override
public String[] getColumnNames(String tableName) {
- String stmt = "SELECT t.* FROM " + tableName + " AS t WHERE 1 = 1";
+ String stmt = getColNamesQuery(tableName);
- ResultSet results = execute(stmt);
- if (null == results) {
+ ResultSet results;
+ try {
+ results = execute(stmt);
+ } catch (SQLException sqlE) {
+ LOG.error("Error executing statement: " + sqlE.toString());
return null;
}
@@ -87,15 +95,32 @@
} catch (SQLException sqlException) {
LOG.error("Error reading from database: " + sqlException.toString());
return null;
+ } finally {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+ }
}
}
+ /**
+ * @return the SQL query to use in getColumnTypes() in case this logic must
+ * be tuned per-database, but the main extraction loop is still inheritable.
+ */
+ protected String getColTypesQuery(String tableName) {
+ return getColNamesQuery(tableName);
+ }
+
@Override
public Map<String, Integer> getColumnTypes(String tableName) {
- String stmt = "SELECT t.* FROM " + tableName + " AS t WHERE 1 = 1";
+ String stmt = getColTypesQuery(tableName);
- ResultSet results = execute(stmt);
- if (null == results) {
+ ResultSet results;
+ try {
+ results = execute(stmt);
+ } catch (SQLException sqlE) {
+ LOG.error("Error executing statement: " + sqlE.toString());
return null;
}
@@ -118,6 +143,12 @@
} catch (SQLException sqlException) {
LOG.error("Error reading from database: " + sqlException.toString());
return null;
+ } finally {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+ }
}
}
@@ -157,28 +188,38 @@
ResultSet results = null;
String [] tableTypes = {"TABLE"};
try {
- DatabaseMetaData metaData = this.getConnection().getMetaData();
- results = metaData.getTables(null, null, null, tableTypes);
- } catch (SQLException sqlException) {
- LOG.error("Error reading database metadata: " + sqlException.toString());
- return null;
- }
-
- if (null == results) {
- return null;
- }
+ try {
+ DatabaseMetaData metaData = this.getConnection().getMetaData();
+ results = metaData.getTables(null, null, null, tableTypes);
+ } catch (SQLException sqlException) {
+ LOG.error("Error reading database metadata: " + sqlException.toString());
+ return null;
+ }
- try {
- ArrayList<String> tables = new ArrayList<String>();
- while (results.next()) {
- String tableName = results.getString("TABLE_NAME");
- tables.add(tableName);
+ if (null == results) {
+ return null;
}
- return tables.toArray(new String[0]);
- } catch (SQLException sqlException) {
- LOG.error("Error reading from database: " + sqlException.toString());
- return null;
+ try {
+ ArrayList<String> tables = new ArrayList<String>();
+ while (results.next()) {
+ String tableName = results.getString("TABLE_NAME");
+ tables.add(tableName);
+ }
+
+ return tables.toArray(new String[0]);
+ } catch (SQLException sqlException) {
+ LOG.error("Error reading from database: " + sqlException.toString());
+ return null;
+ }
+ } finally {
+ if (null != results) {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Exception closing ResultSet: " + sqlE.toString());
+ }
+ }
}
}
@@ -190,16 +231,20 @@
if (null == results) {
return null;
}
-
- if (results.next()) {
- return results.getString("COLUMN_NAME");
+
+ try {
+ if (results.next()) {
+ return results.getString("COLUMN_NAME");
+ } else {
+ return null;
+ }
+ } finally {
+ results.close();
}
} catch (SQLException sqlException) {
LOG.error("Error reading primary key metadata: " + sqlException.toString());
return null;
}
-
- return null;
}
/**
@@ -234,31 +279,18 @@
* @param stmt The SQL statement to execute
* @return A ResultSet encapsulating the results or null on error
*/
- protected ResultSet execute(String stmt, Object... args) {
- if (null == stmt) {
- LOG.error("Null statement sent to SqlManager.execute()");
- return null;
- }
-
+ protected ResultSet execute(String stmt, Object... args) throws SQLException {
PreparedStatement statement = null;
- try {
- statement = this.getConnection().prepareStatement(stmt,
- ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- if (null != args) {
- for (int i = 0; i < args.length; i++) {
- statement.setObject(i + 1, args[i]);
- }
+ statement = this.getConnection().prepareStatement(stmt,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ if (null != args) {
+ for (int i = 0; i < args.length; i++) {
+ statement.setObject(i + 1, args[i]);
}
-
- LOG.debug("Executing SQL statement: " + stmt);
- return statement.executeQuery();
- } catch (SQLException sqlException) {
- LOG.error("Error returned by SQL database: " + sqlException.toString());
- return null;
}
- // TODO(aaron): Is calling ResultSet.close() sufficient?
- // Or must statement.close() be called too?
+ LOG.info("Executing SQL statement: " + stmt);
+ return statement.executeQuery();
}
/**
@@ -319,28 +351,38 @@
*/
public void execAndPrint(String s) {
System.out.println("Executing statement: " + s);
- ResultSet results = execute(s);
- if (null == results) {
- LOG.info("Got null results back!");
+ ResultSet results;
+ try {
+ results = execute(s);
+ } catch (SQLException sqlE) {
+ LOG.error("Error executing statement: " + sqlE.toString());
return;
}
try {
- int cols = results.getMetaData().getColumnCount();
- System.out.println("Got " + cols + " columns back");
- if (cols > 0) {
- System.out.println("Schema: " + results.getMetaData().getSchemaName(1));
- System.out.println("Table: " + results.getMetaData().getTableName(1));
+ try {
+ int cols = results.getMetaData().getColumnCount();
+ System.out.println("Got " + cols + " columns back");
+ if (cols > 0) {
+ System.out.println("Schema: " + results.getMetaData().getSchemaName(1));
+ System.out.println("Table: " + results.getMetaData().getTableName(1));
+ }
+ } catch (SQLException sqlE) {
+ LOG.error("SQLException reading result metadata: " + sqlE.toString());
}
- } catch (SQLException sqlE) {
- LOG.error("SQLException reading result metadata: " + sqlE.toString());
- }
- try {
- new ResultSetPrinter().printResultSet(System.out, results);
- } catch (IOException ioe) {
- LOG.error("IOException writing results to stdout: " + ioe.toString());
- return;
+ try {
+ new ResultSetPrinter().printResultSet(System.out, results);
+ } catch (IOException ioe) {
+ LOG.error("IOException writing results to stdout: " + ioe.toString());
+ return;
+ }
+ } finally {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+ }
}
}
@@ -368,8 +410,8 @@
connection = DriverManager.getConnection(options.getConnectString(), username, password);
}
- connection.setAutoCommit(false);
- connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ // We only use this for metadata queries. Loosest semantics are okay.
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
return connection;
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java Thu Jul 9 12:17:46 2009
@@ -130,8 +130,9 @@
@Test
public void testReadTable() {
+ ResultSet results = null;
try {
- ResultSet results = manager.readTable(HsqldbTestServer.getTableName(),
+ results = manager.readTable(HsqldbTestServer.getTableName(),
HsqldbTestServer.getFieldNames());
assertNotNull("ResultSet from readTable() is null!", results);
@@ -161,17 +162,34 @@
assertEquals("Expected right sum of 20", EXPECTED_COL2_SUM, sumCol2);
} catch (SQLException sqlException) {
fail("SQL Exception: " + sqlException.toString());
+ } finally {
+ if (null != results) {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ fail("SQL Exception in ResultSet.close(): " + sqlE.toString());
+ }
+ }
}
}
@Test
public void testReadMissingTable() {
+ ResultSet results = null;
try {
String [] colNames = { "*" };
- ResultSet results = manager.readTable(MISSING_TABLE, colNames);
+ results = manager.readTable(MISSING_TABLE, colNames);
assertNull("Expected null resultset from readTable(MISSING_TABLE)", results);
} catch (SQLException sqlException) {
// we actually expect this. pass.
+ } finally {
+ if (null != results) {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ fail("SQL Exception in ResultSet.close(): " + sqlE.toString());
+ }
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Thu Jul 9 12:17:46 2009
@@ -239,8 +239,9 @@
*
*/
protected void verifyReadback(int colNum, String expectedVal) {
+ ResultSet results = null;
try {
- ResultSet results = getManager().readTable(getTableName(), getColNames());
+ results = getManager().readTable(getTableName(), getColNames());
assertNotNull("Null results from readTable()!", results);
assertTrue("Expected at least one row returned", results.next());
String resultVal = results.getString(colNum);
@@ -250,9 +251,16 @@
assertEquals("Error reading inserted value back from db", expectedVal, resultVal);
assertFalse("Expected at most one row returned", results.next());
- results.close();
} catch (SQLException sqlE) {
fail("Got SQLException: " + sqlE.toString());
+ } finally {
+ if (null != results) {
+ try {
+ results.close();
+ } catch (SQLException sqlE) {
+ fail("Got SQLException in resultset.close(): " + sqlE.toString());
+ }
+ }
}
}