You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Alexander Schwid (JIRA)" <ji...@apache.org> on 2008/10/02 11:11:44 UTC

[jira] Updated: (HADOOP-4331) DBOutputFormat: add batch size support for JDBC and recieve DBWritable object in value not in key

     [ https://issues.apache.org/jira/browse/HADOOP-4331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexander Schwid updated HADOOP-4331:
-------------------------------------

    Attachment: patch.txt

Patch for this task

> DBOutputFormat: add batch size support for JDBC and recieve  DBWritable object in value not in key
> --------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-4331
>                 URL: https://issues.apache.org/jira/browse/HADOOP-4331
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Alexander Schwid
>            Priority: Minor
>             Fix For: 0.19.0
>
>         Attachments: patch.txt
>
>
> add batch size support for JDBC in DBOutputFormat 
> recieve  DBWritable object in value not in key in DBOutputFormat
> ---------------patch--------------
> Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
> ===================================================================
> --- src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java        (revision 701034)
> +++ src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java        (working copy)
> @@ -80,6 +80,11 @@
>    /** Field names in the Output table */
>    public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";
> +  /** Batch size for output statement */
> +  public static final String OUTPUT_BATCH_SIZE = "mapred.jdbc.output.batch.size";
> +
> +  public static final int DEFAULT_BATCH_SIZE = 1000;
> +
>    /**
>     * Sets the DB access related fields in the JobConf.
>     * @param job the job
> @@ -212,5 +217,12 @@
>      job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
>    }
> +  int getBatchSize() {
> +    return job.getInt(DBConfiguration.OUTPUT_BATCH_SIZE, DEFAULT_BATCH_SIZE);
> +  }
> +
> +  void setBatchSize(int sz) {
> +    job.setInt(DBConfiguration.OUTPUT_BATCH_SIZE, sz);
> +  }
>  }
> Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
> ===================================================================
> --- src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java        (revision 701034)
> +++ src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java        (working copy)
> @@ -37,11 +37,11 @@
>   * A OutputFormat that sends the reduce output to a SQL table.
>   * <p>
>   * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where
> - * key has a type extending DBWritable. Returned {@link RecordWriter}
> - * writes <b>only the key</b> to the database with a batch SQL query.
> + * value has a type extending DBWritable. Returned {@link RecordWriter}
> + * writes <b>only the value</b> to the database with a batch SQL query.
>   *
>   */
> -public class DBOutputFormat<K  extends DBWritable, V>
> +public class DBOutputFormat<K, V extends DBWritable>
>  implements OutputFormat<K,V> {
>    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
> @@ -54,27 +54,21 @@
>      private Connection connection;
>      private PreparedStatement statement;
> +    private int batch = 0;
> +    private int batchSize;
>      protected DBRecordWriter(Connection connection
> -        , PreparedStatement statement) throws SQLException {
> +        , PreparedStatement statement, int batchSize) throws SQLException {
>        this.connection = connection;
>        this.statement = statement;
>        this.connection.setAutoCommit(false);
> +      this.batchSize = batchSize;
>      }
>      /** {@inheritDoc} */
>      public void close(Reporter reporter) throws IOException {
>        try {
> -        statement.executeBatch();
> -        connection.commit();
> -      } catch (SQLException e) {
> -        try {
> -          connection.rollback();
> -        }
> -        catch (SQLException ex) {
> -          LOG.warn(StringUtils.stringifyException(ex));
> -        }
> -        throw new IOException(e.getMessage());
> +        executeBatch();
>        } finally {
>          try {
>            statement.close();
> @@ -89,12 +83,37 @@
>      /** {@inheritDoc} */
>      public void write(K key, V value) throws IOException {
>        try {
> -        key.write(statement);
> +        value.write(statement);
>          statement.addBatch();
> +        batch++;
> +        if (batch == batchSize) {
> +          executeBatch();
> +          batch = 0;
> +        }
> +
>        } catch (SQLException e) {
>          e.printStackTrace();
>        }
>      }
> +
> +    private void executeBatch() throws IOException {
> +      if (batch > 0) {
> +        try {
> +          statement.executeBatch();
> +          connection.commit();
> +          statement.clearBatch();
> +        }
> +        catch(SQLException e) {
> +          try {
> +            connection.rollback();
> +          }
> +          catch (SQLException ex) {
> +            LOG.warn(StringUtils.stringifyException(ex));
> +          }
> +          throw new IOException(e.getMessage());
> +        }
> +      }
> +    }
>    }
>    /**
> @@ -129,13 +148,14 @@
>      DBConfiguration dbConf = new DBConfiguration(job);
>      String tableName = dbConf.getOutputTableName();
>      String[] fieldNames = dbConf.getOutputFieldNames();
> +    int batchSize = dbConf.getBatchSize();
>      try {
>        Connection connection = dbConf.getConnection();
>        PreparedStatement statement = null;
>        statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
> -      return new DBRecordWriter(connection, statement);
> +      return new DBRecordWriter(connection, statement, batchSize);
>      }
>      catch (Exception ex) {
>        throw new IOException(ex.getMessage());

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.