You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Alexander Schwid (JIRA)" <ji...@apache.org> on 2008/10/02 11:11:44 UTC
[jira] Updated: (HADOOP-4331) DBOutputFormat: add batch size
support for JDBC and recieve DBWritable object in value not in key
[ https://issues.apache.org/jira/browse/HADOOP-4331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Schwid updated HADOOP-4331:
-------------------------------------
Attachment: patch.txt
Patch for this task
> DBOutputFormat: add batch size support for JDBC and recieve DBWritable object in value not in key
> --------------------------------------------------------------------------------------------------
>
> Key: HADOOP-4331
> URL: https://issues.apache.org/jira/browse/HADOOP-4331
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Reporter: Alexander Schwid
> Priority: Minor
> Fix For: 0.19.0
>
> Attachments: patch.txt
>
>
> add batch size support for JDBC in DBOutputFormat
> recieve DBWritable object in value not in key in DBOutputFormat
> ---------------patch--------------
> Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
> ===================================================================
> --- src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (revision 701034)
> +++ src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (working copy)
> @@ -80,6 +80,11 @@
> /** Field names in the Output table */
> public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";
> + /** Batch size for output statement */
> + public static final String OUTPUT_BATCH_SIZE = "mapred.jdbc.output.batch.size";
> +
> + public static final int DEFAULT_BATCH_SIZE = 1000;
> +
> /**
> * Sets the DB access related fields in the JobConf.
> * @param job the job
> @@ -212,5 +217,12 @@
> job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
> }
> + int getBatchSize() {
> + return job.getInt(DBConfiguration.OUTPUT_BATCH_SIZE, DEFAULT_BATCH_SIZE);
> + }
> +
> + void setBatchSize(int sz) {
> + job.setInt(DBConfiguration.OUTPUT_BATCH_SIZE, sz);
> + }
> }
> Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
> ===================================================================
> --- src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (revision 701034)
> +++ src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (working copy)
> @@ -37,11 +37,11 @@
> * A OutputFormat that sends the reduce output to a SQL table.
> * <p>
> * {@link DBOutputFormat} accepts <key,value> pairs, where
> - * key has a type extending DBWritable. Returned {@link RecordWriter}
> - * writes <b>only the key</b> to the database with a batch SQL query.
> + * value has a type extending DBWritable. Returned {@link RecordWriter}
> + * writes <b>only the value</b> to the database with a batch SQL query.
> *
> */
> -public class DBOutputFormat<K extends DBWritable, V>
> +public class DBOutputFormat<K, V extends DBWritable>
> implements OutputFormat<K,V> {
> private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
> @@ -54,27 +54,21 @@
> private Connection connection;
> private PreparedStatement statement;
> + private int batch = 0;
> + private int batchSize;
> protected DBRecordWriter(Connection connection
> - , PreparedStatement statement) throws SQLException {
> + , PreparedStatement statement, int batchSize) throws SQLException {
> this.connection = connection;
> this.statement = statement;
> this.connection.setAutoCommit(false);
> + this.batchSize = batchSize;
> }
> /** {@inheritDoc} */
> public void close(Reporter reporter) throws IOException {
> try {
> - statement.executeBatch();
> - connection.commit();
> - } catch (SQLException e) {
> - try {
> - connection.rollback();
> - }
> - catch (SQLException ex) {
> - LOG.warn(StringUtils.stringifyException(ex));
> - }
> - throw new IOException(e.getMessage());
> + executeBatch();
> } finally {
> try {
> statement.close();
> @@ -89,12 +83,37 @@
> /** {@inheritDoc} */
> public void write(K key, V value) throws IOException {
> try {
> - key.write(statement);
> + value.write(statement);
> statement.addBatch();
> + batch++;
> + if (batch == batchSize) {
> + executeBatch();
> + batch = 0;
> + }
> +
> } catch (SQLException e) {
> e.printStackTrace();
> }
> }
> +
> + private void executeBatch() throws IOException {
> + if (batch > 0) {
> + try {
> + statement.executeBatch();
> + connection.commit();
> + statement.clearBatch();
> + }
> + catch(SQLException e) {
> + try {
> + connection.rollback();
> + }
> + catch (SQLException ex) {
> + LOG.warn(StringUtils.stringifyException(ex));
> + }
> + throw new IOException(e.getMessage());
> + }
> + }
> + }
> }
> /**
> @@ -129,13 +148,14 @@
> DBConfiguration dbConf = new DBConfiguration(job);
> String tableName = dbConf.getOutputTableName();
> String[] fieldNames = dbConf.getOutputFieldNames();
> + int batchSize = dbConf.getBatchSize();
> try {
> Connection connection = dbConf.getConnection();
> PreparedStatement statement = null;
> statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
> - return new DBRecordWriter(connection, statement);
> + return new DBRecordWriter(connection, statement, batchSize);
> }
> catch (Exception ex) {
> throw new IOException(ex.getMessage());
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.