You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/07/09 14:17:47 UTC

svn commit: r792522 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/

Author: tomwhite
Date: Thu Jul  9 12:17:46 2009
New Revision: 792522

URL: http://svn.apache.org/viewvc?rev=792522&view=rev
Log:
MAPREDUCE-685. Sqoop will fail with OutOfMemory on large tables using mysql. Contributed by Aaron Kimball.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  9 12:17:46 2009
@@ -159,3 +159,5 @@
     MAPREDUCE-732. Removed spurious log statements in the node
     blacklisting logic. (Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-685. Sqoop will fail with OutOfMemory on large tables
+    using mysql. (Aaron Kimball via tomwhite)

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Thu Jul  9 12:17:46 2009
@@ -67,6 +67,8 @@
    * Execute a SQL statement to read the named set of columns from a table.
    * If columns is null, all columns from the table are read. This is a local
    * (non-parallelized) read of the table back to the current client.
+   * The client is responsible for calling ResultSet.close() when done with the
+   * returned ResultSet object.
    */
   ResultSet readTable(String tableName, String [] columns) throws SQLException;
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Jul  9 12:17:46 2009
@@ -27,6 +27,7 @@
 import java.io.OutputStreamWriter;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.sql.SQLException;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Thu Jul  9 12:17:46 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.sqoop.manager;
 
 import java.io.IOException;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -53,11 +54,20 @@
   }
 
   @Override
+  protected String getColNamesQuery(String tableName) {
+    // Use mysql-specific hints and LIMIT to return fast
+    return "SELECT t.* FROM " + tableName + " AS t LIMIT 1";
+  }
+
+  @Override
   public String[] listDatabases() {
     // TODO(aaron): Add an automated unit test for this.
 
-    ResultSet results = execute("SHOW DATABASES");
-    if (null == results) {
+    ResultSet results;
+    try {
+      results = execute("SHOW DATABASES");
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString());
       return null;
     }
 
@@ -72,6 +82,12 @@
     } catch (SQLException sqlException) {
       LOG.error("Error reading from database: " + sqlException.toString());
       return null;
+    } finally {
+      try {
+        results.close();
+      } catch (SQLException sqlE) {
+        LOG.warn("Exception closing ResultSet: " + sqlE.toString());
+      }
     }
   }
 
@@ -98,5 +114,30 @@
     // Then run the normal importTable() method.
     super.importTable(tableName, jarFile, conf);
   }
+
+  /**
+   * Executes an arbitrary SQL statement. Sets mysql-specific parameter
+   * to ensure the entire table is not buffered in RAM before reading
+   * any rows. A consequence of this is that every ResultSet returned
+   * by this method *MUST* be close()'d, or read to exhaustion before
+   * another query can be executed from this ConnManager instance.
+   *
+   * @param stmt The SQL statement to execute
+   * @return A ResultSet encapsulating the results or null on error
+   */
+  protected ResultSet execute(String stmt, Object... args) throws SQLException {
+    PreparedStatement statement = null;
+    statement = this.getConnection().prepareStatement(stmt,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    if (null != args) {
+      for (int i = 0; i < args.length; i++) {
+        statement.setObject(i + 1, args[i]);
+      }
+    }
+
+    LOG.info("Executing SQL statement: " + stmt);
+    return statement.executeQuery();
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Thu Jul  9 12:17:46 2009
@@ -44,9 +44,6 @@
  * ConnManager implementation for generic SQL-compliant database.
  * This is an abstract class; it requires a database-specific
  * ConnManager implementation to actually create the connection.
- *
- * 
- *
  */
 public abstract class SqlManager implements ConnManager {
 
@@ -63,12 +60,23 @@
     this.options = opts;
   }
 
+  /**
+   * @return the SQL query to use in getColumnNames() in case this logic must
+   * be tuned per-database, but the main extraction loop is still inheritable.
+   */
+  protected String getColNamesQuery(String tableName) {
+    return "SELECT t.* FROM " + tableName + " AS t";
+  }
+
   @Override
   public String[] getColumnNames(String tableName) {
-    String stmt = "SELECT t.* FROM " + tableName + " AS t WHERE 1 = 1";
+    String stmt = getColNamesQuery(tableName);
 
-    ResultSet results = execute(stmt);
-    if (null == results) {
+    ResultSet results;
+    try {
+      results = execute(stmt);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString());
       return null;
     }
 
@@ -87,15 +95,32 @@
     } catch (SQLException sqlException) {
       LOG.error("Error reading from database: " + sqlException.toString());
       return null;
+    } finally {
+      try {
+        results.close();
+      } catch (SQLException sqlE) {
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+      }
     }
   }
 
+  /**
+   * @return the SQL query to use in getColumnTypes() in case this logic must
+   * be tuned per-database, but the main extraction loop is still inheritable.
+   */
+  protected String getColTypesQuery(String tableName) {
+    return getColNamesQuery(tableName);
+  }
+  
   @Override
   public Map<String, Integer> getColumnTypes(String tableName) {
-    String stmt = "SELECT t.* FROM " + tableName + " AS t WHERE 1 = 1";
+    String stmt = getColTypesQuery(tableName);
 
-    ResultSet results = execute(stmt);
-    if (null == results) {
+    ResultSet results;
+    try {
+      results = execute(stmt);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString());
       return null;
     }
 
@@ -118,6 +143,12 @@
     } catch (SQLException sqlException) {
       LOG.error("Error reading from database: " + sqlException.toString());
       return null;
+    } finally {
+      try {
+        results.close();
+      } catch (SQLException sqlE) { 
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+      }
     }
   }
 
@@ -157,28 +188,38 @@
     ResultSet results = null;
     String [] tableTypes = {"TABLE"};
     try {
-      DatabaseMetaData metaData = this.getConnection().getMetaData();
-      results = metaData.getTables(null, null, null, tableTypes);
-    } catch (SQLException sqlException) {
-      LOG.error("Error reading database metadata: " + sqlException.toString());
-      return null;
-    }
-
-    if (null == results) {
-      return null;
-    }
+      try {
+        DatabaseMetaData metaData = this.getConnection().getMetaData();
+        results = metaData.getTables(null, null, null, tableTypes);
+      } catch (SQLException sqlException) {
+        LOG.error("Error reading database metadata: " + sqlException.toString());
+        return null;
+      }
 
-    try {
-      ArrayList<String> tables = new ArrayList<String>();
-      while (results.next()) {
-        String tableName = results.getString("TABLE_NAME");
-        tables.add(tableName);
+      if (null == results) {
+        return null;
       }
 
-      return tables.toArray(new String[0]);
-    } catch (SQLException sqlException) {
-      LOG.error("Error reading from database: " + sqlException.toString());
-      return null;
+      try {
+        ArrayList<String> tables = new ArrayList<String>();
+        while (results.next()) {
+          String tableName = results.getString("TABLE_NAME");
+          tables.add(tableName);
+        }
+
+        return tables.toArray(new String[0]);
+      } catch (SQLException sqlException) {
+        LOG.error("Error reading from database: " + sqlException.toString());
+        return null;
+      }
+    } finally {
+      if (null != results) {
+        try {
+          results.close();
+        } catch (SQLException sqlE) {
+          LOG.warn("Exception closing ResultSet: " + sqlE.toString());
+        }
+      }
     }
   }
 
@@ -190,16 +231,20 @@
       if (null == results) {
         return null;
       }
-
-      if (results.next()) {
-        return results.getString("COLUMN_NAME");
+      
+      try {
+        if (results.next()) {
+          return results.getString("COLUMN_NAME");
+        } else {
+          return null;
+        }
+      } finally {
+        results.close();
       }
     } catch (SQLException sqlException) {
       LOG.error("Error reading primary key metadata: " + sqlException.toString());
       return null;
     }
-
-    return null;
   }
 
   /**
@@ -234,31 +279,18 @@
    * @param stmt The SQL statement to execute
    * @return A ResultSet encapsulating the results or null on error
    */
-  protected ResultSet execute(String stmt, Object... args) {
-    if (null == stmt) {
-      LOG.error("Null statement sent to SqlManager.execute()");
-      return null;
-    }
-
+  protected ResultSet execute(String stmt, Object... args) throws SQLException {
     PreparedStatement statement = null;
-    try {
-      statement = this.getConnection().prepareStatement(stmt,
-          ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-      if (null != args) {
-        for (int i = 0; i < args.length; i++) {
-          statement.setObject(i + 1, args[i]);
-        }
+    statement = this.getConnection().prepareStatement(stmt,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    if (null != args) {
+      for (int i = 0; i < args.length; i++) {
+        statement.setObject(i + 1, args[i]);
       }
-
-      LOG.debug("Executing SQL statement: " + stmt);
-      return statement.executeQuery();
-    } catch (SQLException sqlException) {
-      LOG.error("Error returned by SQL database: " + sqlException.toString());
-      return null;
     }
 
-    // TODO(aaron): Is calling ResultSet.close() sufficient?
-    // Or must statement.close() be called too?
+    LOG.info("Executing SQL statement: " + stmt);
+    return statement.executeQuery();
   }
 
   /**
@@ -319,28 +351,38 @@
    */
   public void execAndPrint(String s) {
     System.out.println("Executing statement: " + s);
-    ResultSet results = execute(s);
-    if (null == results) {
-      LOG.info("Got null results back!");
+    ResultSet results;
+    try {
+      results = execute(s);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString());
       return;
     }
 
     try {
-      int cols = results.getMetaData().getColumnCount();
-      System.out.println("Got " + cols + " columns back");
-      if (cols > 0) {
-        System.out.println("Schema: " + results.getMetaData().getSchemaName(1));
-        System.out.println("Table: " + results.getMetaData().getTableName(1));
+      try {
+        int cols = results.getMetaData().getColumnCount();
+        System.out.println("Got " + cols + " columns back");
+        if (cols > 0) {
+          System.out.println("Schema: " + results.getMetaData().getSchemaName(1));
+          System.out.println("Table: " + results.getMetaData().getTableName(1));
+        }
+      } catch (SQLException sqlE) {
+        LOG.error("SQLException reading result metadata: " + sqlE.toString());
       }
-    } catch (SQLException sqlE) {
-      LOG.error("SQLException reading result metadata: " + sqlE.toString());
-    }
 
-    try {
-      new ResultSetPrinter().printResultSet(System.out, results);
-    } catch (IOException ioe) {
-      LOG.error("IOException writing results to stdout: " + ioe.toString());
-      return;
+      try {
+        new ResultSetPrinter().printResultSet(System.out, results);
+      } catch (IOException ioe) {
+        LOG.error("IOException writing results to stdout: " + ioe.toString());
+        return;
+      }
+    } finally {
+      try {
+        results.close();
+      } catch (SQLException sqlE) {
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+      }
     }
   }
 
@@ -368,8 +410,8 @@
       connection = DriverManager.getConnection(options.getConnectString(), username, password);
     }
 
-    connection.setAutoCommit(false);
-    connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+    // We only use this for metadata queries. Loosest semantics are okay.
+    connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
 
     return connection;
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestSqlManager.java Thu Jul  9 12:17:46 2009
@@ -130,8 +130,9 @@
 
   @Test
   public void testReadTable() {
+    ResultSet results = null;
     try {
-      ResultSet results = manager.readTable(HsqldbTestServer.getTableName(),
+      results = manager.readTable(HsqldbTestServer.getTableName(),
           HsqldbTestServer.getFieldNames());
 
       assertNotNull("ResultSet from readTable() is null!", results);
@@ -161,17 +162,34 @@
       assertEquals("Expected right sum of 20", EXPECTED_COL2_SUM, sumCol2);
     } catch (SQLException sqlException) {
       fail("SQL Exception: " + sqlException.toString());
+    } finally {
+      if (null != results) {
+        try {
+          results.close();
+        } catch (SQLException sqlE) {
+          fail("SQL Exception in ResultSet.close(): " + sqlE.toString());
+        }
+      }
     }
   }
 
   @Test
   public void testReadMissingTable() {
+    ResultSet results = null;
     try {
       String [] colNames = { "*" };
-      ResultSet results = manager.readTable(MISSING_TABLE, colNames);
+      results = manager.readTable(MISSING_TABLE, colNames);
       assertNull("Expected null resultset from readTable(MISSING_TABLE)", results);
     } catch (SQLException sqlException) {
       // we actually expect this. pass.
+    } finally {
+      if (null != results) {
+        try {
+          results.close();
+        } catch (SQLException sqlE) {
+          fail("SQL Exception in ResultSet.close(): " + sqlE.toString());
+        }
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=792522&r1=792521&r2=792522&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Thu Jul  9 12:17:46 2009
@@ -239,8 +239,9 @@
    *
    */
   protected void verifyReadback(int colNum, String expectedVal) {
+    ResultSet results = null;
     try {
-      ResultSet results = getManager().readTable(getTableName(), getColNames());
+      results = getManager().readTable(getTableName(), getColNames());
       assertNotNull("Null results from readTable()!", results);
       assertTrue("Expected at least one row returned", results.next());
       String resultVal = results.getString(colNum);
@@ -250,9 +251,16 @@
 
       assertEquals("Error reading inserted value back from db", expectedVal, resultVal);
       assertFalse("Expected at most one row returned", results.next());
-      results.close();
     } catch (SQLException sqlE) {
       fail("Got SQLException: " + sqlE.toString());
+    } finally {
+      if (null != results) {
+        try {
+          results.close();
+        } catch (SQLException sqlE) {
+          fail("Got SQLException in resultset.close(): " + sqlE.toString());
+        }
+      }
     }
   }