You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 20:51:42 UTC

svn commit: r697184 - in /hadoop/core/trunk: ./ lib/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/lib/db/ src/test/org/apache/hadoop/mapred/lib/db/

Author: acmurthy
Date: Fri Sep 19 11:51:41 2008
New Revision: 697184

URL: http://svn.apache.org/viewvc?rev=697184&view=rev
Log:
HADOOP-2536. Implement a JDBC based database input and output formats to allow Map-Reduce applications to work with databases. Contributed by Fredrik Hedberg and Enis Soztutar.

Added:
    hadoop/core/trunk/lib/hsqldb-LICENSE.txt
    hadoop/core/trunk/lib/hsqldb.jar   (with props)
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697184&r1=697183&r2=697184&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 11:51:41 2008
@@ -188,6 +188,10 @@
     HADOOP-4070. Provide a mechanism in Hive for registering UDFs from the
     query language. (tomwhite)
 
+    HADOOP-2536. Implement a JDBC based database input and output formats to
+    allow Map-Reduce applications to work with databases. (Fredrik Hedberg and
+    Enis Soztutar via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

Added: hadoop/core/trunk/lib/hsqldb-LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/hsqldb-LICENSE.txt?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/lib/hsqldb-LICENSE.txt (added)
+++ hadoop/core/trunk/lib/hsqldb-LICENSE.txt Fri Sep 19 11:51:41 2008
@@ -0,0 +1,66 @@
+/* Copyright (c) 1995-2000, The Hypersonic SQL Group.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the Hypersonic SQL Group nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE HYPERSONIC SQL GROUP, 
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * This software consists of voluntary contributions made by many individuals 
+ * on behalf of the Hypersonic SQL Group.
+ *
+ *
+ * For work added by the HSQL Development Group:
+ * 
+ * Copyright (c) 2001-2004, The HSQL Development Group
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the HSQL Development Group nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL HSQL DEVELOPMENT GROUP, HSQLDB.ORG, 
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+

Added: hadoop/core/trunk/lib/hsqldb.jar
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/hsqldb.jar?rev=697184&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/core/trunk/lib/hsqldb.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java (added)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat;
+import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.hsqldb.Server;
+
+/**
+ * This is a demonstrative program, which uses DBInputFormat for reading
+ * the input data from a database, and DBOutputFormat for writing the data 
+ * to the database. 
+ * <br>
+ * The Program first creates the necessary tables, populates the input table 
+ * and runs the mapred job. 
+ * <br> 
+ * The input data is a mini access log, with a <code>&lt;url,referrer,time&gt;
+ * </code> schema.The output is the number of pageviews of each url in the log, 
+ * having the schema <code>&lt;url,pageview&gt;</code>.  
+ * 
+ * When called with no arguments the program starts a local HSQLDB server, and 
+ * uses this database for storing/retrieving the data. 
+ */
+public class DBCountPageView extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
+  
+  private Connection connection;
+  private boolean initialized = false;
+
+  private static final String[] AccessFieldNames = {"url", "referrer", "time"};
+  private static final String[] PageviewFieldNames = {"url", "pageview"};
+  
+  private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
+  private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+  
+  private Server server;
+  
+  private void startHsqldbServer() {
+    server = new Server();
+    server.setDatabasePath(0, 
+        System.getProperty("test.build.data",".") + "/URLAccess");
+    server.setDatabaseName(0, "URLAccess");
+    server.start();
+  }
+  
+  private void createConnection(String driverClassName
+      , String url) throws Exception {
+    
+    Class.forName(driverClassName);
+    connection = DriverManager.getConnection(url);
+    connection.setAutoCommit(false);
+  }
+
+  private void shutdown() throws SQLException {
+    connection.commit();
+    connection.close();
+    
+    if(server != null) {
+      server.stop();
+      server.shutdown();
+    }
+  }
+
+  private void initialize(String driverClassName, String url) 
+    throws Exception {
+    if(!this.initialized) {
+      if(driverClassName.equals(DRIVER_CLASS)) {
+        startHsqldbServer();
+      }
+      createConnection(driverClassName, url);
+      dropTables();
+      createTables();
+      populateAccess();
+      this.initialized = true;  
+    }
+  }
+  
+  private void dropTables() {
+    String dropAccess = "DROP TABLE Access";
+    String dropPageview = "DROP TABLE Pageview";
+    
+    try {
+      Statement st = connection.createStatement();
+      st.executeUpdate(dropAccess);
+      st.executeUpdate(dropPageview);
+      connection.commit();
+      st.close();
+    }catch (SQLException ex) {
+      //ignore
+    }
+  }
+  
+  private void createTables() throws SQLException {
+
+    String createAccess = 
+      "CREATE TABLE " +
+      "Access(url      VARCHAR(100) NOT NULL," +
+            " referrer VARCHAR(100)," +
+            " time     BIGINT NOT NULL, " +
+            " PRIMARY KEY (url, time))";
+
+    String createPageview = 
+      "CREATE TABLE " +
+      "Pageview(url      VARCHAR(100) NOT NULL," +
+              " pageview     BIGINT NOT NULL, " +
+               " PRIMARY KEY (url))";
+    
+    Statement st = connection.createStatement();
+    try {
+      st.executeUpdate(createAccess);
+      st.executeUpdate(createPageview);
+      connection.commit();
+    } finally {
+      st.close();
+    }
+  }
+
+  /**
+   * Populates the Access table with generated records.
+   */
+  private void populateAccess() throws SQLException {
+
+    PreparedStatement statement = null ;
+    try {
+      statement = connection.prepareStatement(
+          "INSERT INTO Access(url, referrer, time)" +
+          " VALUES (?, ?, ?)");
+
+      Random random = new Random();
+
+      int time = random.nextInt(50) + 50;
+
+      final int PROBABILITY_PRECISION = 100; //  1 / 100 
+      final int NEW_PAGE_PROBABILITY  = 15;  //  15 / 100
+
+
+      //Pages in the site :
+      String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
+      //linkMatrix[i] is the array of pages(indexes) that page_i links to.  
+      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
+          {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
+
+      //a mini model of user browsing a la pagerank
+      int currentPage = random.nextInt(pages.length); 
+      String referrer = null;
+
+      for(int i=0; i<time; i++) {
+
+        statement.setString(1, pages[currentPage]);
+        statement.setString(2, referrer);
+        statement.setLong(3, i);
+        statement.execute();
+
+        int action = random.nextInt(PROBABILITY_PRECISION);
+
+        //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
+        if(action < NEW_PAGE_PROBABILITY) { 
+          currentPage = random.nextInt(pages.length); // a random page
+          referrer = null;
+        }
+        else {
+          referrer = pages[currentPage];
+          action = random.nextInt(linkMatrix[currentPage].length);
+          currentPage = linkMatrix[currentPage][action];
+        }
+      }
+      
+      connection.commit();
+      
+    }catch (SQLException ex) {
+      connection.rollback();
+      throw ex;
+    } finally {
+      if(statement != null) {
+        statement.close();
+      }
+    }
+  }
+  
+  /**Verifies the results are correct */
+  private boolean verify() throws SQLException {
+    //check total num pageview
+    String countAccessQuery = "SELECT COUNT(*) FROM Access";
+    String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
+    Statement st = null;
+    ResultSet rs = null;
+    try {
+      st = connection.createStatement();
+      rs = st.executeQuery(countAccessQuery);
+      rs.next();
+      long totalPageview = rs.getLong(1);
+
+      rs = st.executeQuery(sumPageviewQuery);
+      rs.next();
+      long sumPageview = rs.getLong(1);
+
+      LOG.info("totalPageview=" + totalPageview);
+      LOG.info("sumPageview=" + sumPageview);
+
+      return totalPageview == sumPageview && totalPageview != 0;
+    }finally {
+      if(st != null)
+        st.close();
+      if(rs != null)
+        rs.close();
+    }
+  }
+  
+  /** Holds a &lt;url, referrer, time &gt; tuple */
+  static class AccessRecord implements Writable, DBWritable {
+    String url;
+    String referrer;
+    long time;
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.url = Text.readString(in);
+      this.referrer = Text.readString(in);
+      this.time = in.readLong();
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, url);
+      Text.writeString(out, referrer);
+      out.writeLong(time);
+    }
+    
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+      this.url = resultSet.getString(1);
+      this.referrer = resultSet.getString(2);
+      this.time = resultSet.getLong(3);
+    }
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+      statement.setString(1, url);
+      statement.setString(2, referrer);
+      statement.setLong(3, time);
+    }
+  }
+  /** Holds a &lt;url, pageview &gt; tuple */
+  static class PageviewRecord implements Writable, DBWritable {
+    String url;
+    long pageview;
+   
+    public PageviewRecord(String url, long pageview) {
+      this.url = url;
+      this.pageview = pageview;
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.url = Text.readString(in);
+      this.pageview = in.readLong();
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, url);
+      out.writeLong(pageview);
+    }
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+      this.url = resultSet.getString(1);
+      this.pageview = resultSet.getLong(2);
+    }
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+      statement.setString(1, url);
+      statement.setLong(2, pageview);
+    }
+    @Override
+    public String toString() {
+      return url + " " + pageview;
+    }
+  }
+  
+  /**
+   * Mapper extracts URLs from the AccessRecord (tuples from db), 
+   * and emits a &lt;url,1&gt; pair for each access record. 
+   */
+  static class PageviewMapper extends MapReduceBase 
+    implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
+    
+    LongWritable ONE = new LongWritable(1L);
+    @Override
+    public void map(LongWritable key, AccessRecord value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      
+      Text oKey = new Text(value.url);
+      output.collect(oKey, ONE);
+    }
+  }
+  
+  /**
+   * Reducer sums up the pageviews and emits a PageviewRecord, 
+   * which will correspond to one tuple in the db.
+   */
+  static class PageviewReducer extends MapReduceBase 
+    implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
+    
+    NullWritable n = NullWritable.get();
+    @Override
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
+        throws IOException {
+      
+      long sum = 0L;
+      while(values.hasNext()) {
+        sum += values.next().get();
+      }
+      output.collect(new PageviewRecord(key.toString(), sum), n);
+    }
+  }
+  
+  @Override
+  //Usage DBCountPageView [driverClass dburl]
+  public int run(String[] args) throws Exception {
+    
+    String driverClassName = DRIVER_CLASS;
+    String url = DB_URL;
+    
+    if(args.length > 1) {
+      driverClassName = args[0];
+      url = args[1];
+    }
+    
+    initialize(driverClassName, url);
+
+    JobConf job = new JobConf(getConf(), DBCountPageView.class);
+        
+    job.setJobName("Count Pageviews of URLs");
+
+    job.setMapperClass(PageviewMapper.class);
+    job.setCombinerClass(LongSumReducer.class);
+    job.setReducerClass(PageviewReducer.class);
+
+    DBConfiguration.configureDB(job, driverClassName, url);
+    
+    DBInputFormat.setInput(job, AccessRecord.class, "Access"
+        , null, "url", AccessFieldNames);
+
+    DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
+    
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+
+    job.setOutputKeyClass(PageviewRecord.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    try {
+      JobClient.runJob(job);
+      
+      boolean correct = verify();
+      if(!correct) {
+        throw new RuntimeException("Evaluation was not correct!");
+      }
+    } finally {
+      shutdown();    
+    }
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new DBCountPageView(), args);
+    System.exit(ret);
+  }
+
+}

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=697184&r1=697183&r2=697184&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri Sep 19 11:51:41 2008
@@ -53,6 +53,7 @@
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
       pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
       pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files.");
+      pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview counts from a database.");
       pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
       pgd.addClass("terasort", TeraSort.class, "Run the terasort");
       pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat.NullDBWritable;
+
+/**
+ * A container for configuration property names for jobs with DB input/output. 
+ * <br>
+ * The job can be configured using the static methods in this class, 
+ * {@link DBInputFormat}, and {@link DBOutputFormat}. 
+ * <p> 
+ * Alternatively, the properties can be set in the configuration with proper
+ * values. 
+ *   
+ * @see DBConfiguration#configureDB(JobConf, String, String, String, String)
+ * @see DBInputFormat#setInput(JobConf, Class, String, String)
+ * @see DBInputFormat#setInput(JobConf, Class, String, String, String, String...)
+ * @see DBOutputFormat#setOutput(JobConf, String, String...)
+ */
+public class DBConfiguration {
+
+  /** The JDBC Driver class name */
+  public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+  
+  /** JDBC Database access URL */
+  public static final String URL_PROPERTY = "mapred.jdbc.url";
+
+  /** User name to access the database */
+  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  
+  /** Password to access the database */
+  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+
+  /** Input table name */
+  public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+
+  /** Field names in the Input table */
+  public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+
+  /** WHERE clause in the input SELECT statement */
+  public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+  
+  /** ORDER BY clause in the input SELECT statement */
+  public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+  
+  /** Whole input query, exluding LIMIT...OFFSET */
+  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  
+  /** Input query to get the count of records */
+  public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+  
+  /** Class name implementing DBWritable which will hold input tuples */
+  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+
+  /** Output table name */
+  public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+
+  /** Field names in the Output table */
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";  
+
+  /**
+   * Sets the DB access related fields in the JobConf.  
+   * @param job the job
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL. 
+   * @param userName DB access username 
+   * @param passwd DB access passwd
+   */
+  public static void configureDB(JobConf job, String driverClass, String dbUrl
+      , String userName, String passwd) {
+
+    job.set(DRIVER_CLASS_PROPERTY, driverClass);
+    job.set(URL_PROPERTY, dbUrl);
+    if(userName != null)
+      job.set(USERNAME_PROPERTY, userName);
+    if(passwd != null)
+      job.set(PASSWORD_PROPERTY, passwd);    
+  }
+
+  /**
+   * Sets the DB access related fields in the JobConf.  
+   * @param job the job
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL. 
+   */
+  public static void configureDB(JobConf job, String driverClass, String dbUrl) {
+    configureDB(job, driverClass, dbUrl, null, null);
+  }
+
+  private JobConf job;
+
+  DBConfiguration(JobConf job) {
+    this.job = job;
+  }
+
+  /** Returns a connection object o the DB 
+   * @throws ClassNotFoundException 
+   * @throws SQLException */
+  Connection getConnection() throws ClassNotFoundException, SQLException{
+
+    Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+
+    if(job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+      return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
+    } else {
+      return DriverManager.getConnection(
+          job.get(DBConfiguration.URL_PROPERTY), 
+          job.get(DBConfiguration.USERNAME_PROPERTY), 
+          job.get(DBConfiguration.PASSWORD_PROPERTY));
+    }
+  }
+
+  String getInputTableName() {
+    return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+  }
+
+  void setInputTableName(String tableName) {
+    job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  String[] getInputFieldNames() {
+    return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  void setInputFieldNames(String... fieldNames) {
+    job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+  String getInputConditions() {
+    return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+  }
+
+  void setInputConditions(String conditions) {
+    if (conditions != null && conditions.length() > 0)
+      job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+  }
+
+  String getInputOrderBy() {
+    return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+  }
+  
+  void setInputOrderBy(String orderby) {
+    if(orderby != null && orderby.length() >0) {
+      job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+    }
+  }
+  
+  String getInputQuery() {
+    return job.get(DBConfiguration.INPUT_QUERY);
+  }
+  
+  void setInputQuery(String query) {
+    if(query != null && query.length() >0) {
+      job.set(DBConfiguration.INPUT_QUERY, query);
+    }
+  }
+  
+  String getInputCountQuery() {
+    return job.get(DBConfiguration.INPUT_COUNT_QUERY);
+  }
+  
+  void setInputCountQuery(String query) {
+    if(query != null && query.length() >0) {
+      job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+    }
+  }
+  
+  
+  Class<?> getInputClass() {
+    return job.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class);
+  }
+
+  void setInputClass(Class<? extends DBWritable> inputClass) {
+    job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
+  }
+
+  String getOutputTableName() {
+    return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+  }
+
+  void setOutputTableName(String tableName) {
+    job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  String[] getOutputFieldNames() {
+    return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  void setOutputFieldNames(String... fieldNames) {
+    job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * <p>
+ * DBInputFormat emits LongWritables containing the record number as 
+ * key and DBWritables as value. 
+ * 
+ * The SQL query, and input class can be using one of the two 
+ * setInput methods.
+ */
+public class DBInputFormat<T  extends DBWritable>
+  implements InputFormat<LongWritable, T>, JobConfigurable {
+  /**
+   * A RecordReader that reads records from a SQL table.
+   * Emits LongWritables containing the record number as 
+   * key and DBWritables as value.  
+   */
+  protected class DBRecordReader implements
+  RecordReader<LongWritable, T> {
+    private ResultSet results;
+
+    private Statement statement;
+
+    private Class<T> inputClass;
+
+    private JobConf job;
+
+    private DBInputSplit split;
+
+    private long pos = 0;
+
+    /**
+     * @param split The InputSplit to read data for
+     * @throws SQLException 
+     */
+    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
+      this.inputClass = inputClass;
+      this.split = split;
+      this.job = job;
+      
+      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+      //statement.setFetchSize(Integer.MIN_VALUE);
+      results = statement.executeQuery(getSelectQuery());
+    }
+
+    /** Returns the query for selecting the records, 
+     * subclasses can override this for custom behaviour.*/
+    protected String getSelectQuery() {
+      StringBuilder query = new StringBuilder();
+      
+      if(dbConf.getInputQuery() == null) {
+        query.append("SELECT ");
+
+        for (int i = 0; i < fieldNames.length; i++) {
+          query.append(fieldNames[i]);
+          if(i != fieldNames.length -1) {
+            query.append(", ");
+          }
+        }
+
+        query.append(" FROM ").append(tableName);
+        query.append(" AS ").append(tableName); //in hsqldb this is necessary
+        if (conditions != null && conditions.length() > 0)
+          query.append(" WHERE (").append(conditions).append(")");
+        String orderBy = dbConf.getInputOrderBy();
+        if(orderBy != null && orderBy.length() > 0) {
+          query.append(" ORDER BY ").append(orderBy);
+        }
+      }
+      else {
+        query.append(dbConf.getInputQuery());
+      }
+
+      try {
+        query.append(" LIMIT ").append(split.getLength());
+        query.append(" OFFSET ").append(split.getStart());
+      }
+      catch (IOException ex) {
+        //ignore, will not throw
+      }
+      return query.toString();
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      try {
+        connection.commit();
+        results.close();
+        statement.close();
+      } catch (SQLException e) {
+        throw new IOException(e.getMessage());
+      }
+    }
+
+    /** {@inheritDoc} */
+    public LongWritable createKey() {
+      return new LongWritable();  
+    }
+
+    /** {@inheritDoc} */
+    public T createValue() {
+      return ReflectionUtils.newInstance(inputClass, job);
+    }
+
+    /** {@inheritDoc} */
+    public long getPos() throws IOException {
+      return pos;
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() throws IOException {
+      return pos / (float)split.getLength();
+    }
+
+    /** {@inheritDoc} */
+    public boolean next(LongWritable key, T value) throws IOException {
+      try {
+        if (!results.next())
+          return false;
+
+        // Set the key field value as the output key value
+        key.set(pos + split.getStart());
+
+        value.readFields(results);
+
+        pos ++;
+      } catch (SQLException e) {
+        throw new IOException(e.getMessage());
+      }
+      return true;
+    }
+  }
+
+  /**
+   * A Class that does nothing, implementing DBWritable
+   */
+  public static class NullDBWritable implements DBWritable, Writable {
+    @Override
+    public void readFields(DataInput in) throws IOException { }
+    @Override
+    public void readFields(ResultSet arg0) throws SQLException { }
+    @Override
+    public void write(DataOutput out) throws IOException { }
+    @Override
+    public void write(PreparedStatement arg0) throws SQLException { }
+  }
+  /**
+   * A InputSplit that spans a set of rows
+   */
+  protected static class DBInputSplit implements InputSplit {
+
+    private long end = 0;
+    private long start = 0;
+
+    /**
+     * Default Constructor
+     */
+    public DBInputSplit() {
+    }
+
+    /**
+     * Convenience Constructor
+     * @param start the index of the first row to select
+     * @param end the index of the last row to select
+     */
+    public DBInputSplit(long start, long end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    /** {@inheritDoc} */
+    public String[] getLocations() throws IOException {
+      // TODO Add a layer to enable SQL "sharding" and support locality
+      return new String[] {};
+    }
+
+    /**
+     * @return The index of the first row to select
+     */
+    public long getStart() {
+      return start;
+    }
+
+    /**
+     * @return The index of the last row to select
+     */
+    public long getEnd() {
+      return end;
+    }
+
+    /**
+     * @return The total row count in this split
+     */
+    public long getLength() throws IOException {
+      return end - start;
+    }
+
+    /** {@inheritDoc} */
+    public void readFields(DataInput input) throws IOException {
+      start = input.readLong();
+      end = input.readLong();
+    }
+
+    /** {@inheritDoc} */
+    public void write(DataOutput output) throws IOException {
+      output.writeLong(start);
+      output.writeLong(end);
+    }
+  }
+
+  private String conditions;
+
+  private Connection connection;
+
+  private String tableName;
+
+  private String[] fieldNames;
+
+  private DBConfiguration dbConf;
+
+  /** {@inheritDoc} */
+  public void configure(JobConf job) {
+
+    dbConf = new DBConfiguration(job);
+
+    try {
+      this.connection = dbConf.getConnection();
+      this.connection.setAutoCommit(false);
+      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    tableName = dbConf.getInputTableName();
+    fieldNames = dbConf.getInputFieldNames();
+    conditions = dbConf.getInputConditions();
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter) throws IOException {
+
+    Class inputClass = dbConf.getInputClass();
+    try {
+      return new DBRecordReader((DBInputSplit) split, inputClass, job);
+    }
+    catch (SQLException ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+
+  /** {@inheritDoc} */
+  public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
+
+    try {
+      Statement statement = connection.createStatement();
+
+      ResultSet results = statement.executeQuery(getCountQuery());
+      results.next();
+
+      long count = results.getLong(1);
+      long chunkSize = (count / chunks);
+
+      results.close();
+      statement.close();
+
+      InputSplit[] splits = new InputSplit[chunks];
+
+      // Split the rows into n-number of chunks and adjust the last chunk
+      // accordingly
+      for (int i = 0; i < chunks; i++) {
+        DBInputSplit split;
+
+        if ((i + 1) == chunks)
+          split = new DBInputSplit(i * chunkSize, count);
+        else
+          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+              + chunkSize);
+
+        splits[i] = split;
+      }
+
+      return splits;
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  /** Returns the query for getting the total number of rows, 
+   * subclasses can override this for custom behaviour.*/
+  protected String getCountQuery() {
+    
+    if(dbConf.getInputCountQuery() != null) {
+      return dbConf.getInputCountQuery();
+    }
+    
+    StringBuilder query = new StringBuilder();
+    query.append("SELECT COUNT(*) FROM " + tableName);
+
+    if (conditions != null && conditions.length() > 0)
+      query.append(" WHERE " + conditions);
+    return query.toString();
+  }
+
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   * 
+   * @param job The job
+   * @param inputClass the class object implementing DBWritable, which is the 
+   * Java object holding tuple fields.
+   * @param tableName The table to read data from
+   * @param conditions The condition which to select data with, eg. '(updated >
+   * 20070101 AND length > 0)'
+   * @param orderBy the fieldNames in the orderBy clause.
+   * @param fieldNames The field names in the table
+   * @see #setInput(JobConf, Class, String, String)
+   */
+  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+      String tableName,String conditions, String orderBy, String... fieldNames) {
+    job.setInputFormat(DBInputFormat.class);
+
+    DBConfiguration dbConf = new DBConfiguration(job);
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputTableName(tableName);
+    dbConf.setInputFieldNames(fieldNames);
+    dbConf.setInputConditions(conditions);
+    dbConf.setInputOrderBy(orderBy);
+  }
+  
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   * 
+   * @param job The job
+   * @param inputClass the class object implementing DBWritable, which is the 
+   * Java object holding tuple fields.
+   * @param inputQuery the input query to select fields. Example : 
+   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
+   * @param inputCountQuery the input query that returns the number of records in
+   * the table. 
+   * Example : "SELECT COUNT(f1) FROM Mytable"
+   * @see #setInput(JobConf, Class, String, String, String, String...)
+   */
+  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+      String inputQuery, String inputCountQuery) {
+    job.setInputFormat(DBInputFormat.class);
+    
+    DBConfiguration dbConf = new DBConfiguration(job);
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputQuery(inputQuery);
+    dbConf.setInputCountQuery(inputCountQuery);
+    
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table.
+ * <p> 
+ * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
+ * key has a type extending DBWritable. Returned {@link RecordWriter} 
+ * writes <b>only the key</b> to the database with a batch SQL query.  
+ * 
+ */
+public class DBOutputFormat<K  extends DBWritable, V> 
+implements OutputFormat<K,V> {
+
+  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+
+  /**
+   * A RecordWriter that writes the reduce output to a SQL table
+   */
+  protected class DBRecordWriter 
+  implements RecordWriter<K, V> {
+
+    private Connection connection;
+    private PreparedStatement statement;
+
+    protected DBRecordWriter(Connection connection
+        , PreparedStatement statement) throws SQLException {
+      this.connection = connection;
+      this.statement = statement;
+      this.connection.setAutoCommit(false);
+    }
+
+    /** {@inheritDoc} */
+    public void close(Reporter reporter) throws IOException {
+      try {
+        statement.executeBatch();
+        connection.commit();
+      } catch (SQLException e) {
+        try {
+          connection.rollback();
+        }
+        catch (SQLException ex) {
+          LOG.warn(StringUtils.stringifyException(ex));
+        }
+        throw new IOException(e.getMessage());
+      } finally {
+        try {
+          statement.close();
+          connection.close();
+        }
+        catch (SQLException ex) {
+          throw new IOException(ex.getMessage());
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void write(K key, V value) throws IOException {
+      try {
+        key.write(statement);
+        statement.addBatch();
+      } catch (SQLException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Constructs the query used as the prepared statement to insert data.
+   */
+  protected String constructQuery(String table, String[] fieldNames) {
+    StringBuilder query = new StringBuilder();
+    query.append("INSERT INTO ").append(table);      
+    query.append(" VALUES (");
+
+    for (int i = 0; i < fieldNames.length; i++) {
+      query.append("?");
+      if(i != fieldNames.length - 1) {
+        query.append(",");
+      }
+    }
+    query.append(");");
+
+    return query.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(FileSystem filesystem, JobConf job)
+  throws IOException {
+  }
+
+
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
+      JobConf job, String name, Progressable progress) throws IOException {
+
+    DBConfiguration dbConf = new DBConfiguration(job);
+    String tableName = dbConf.getOutputTableName();
+    String[] fieldNames = dbConf.getOutputFieldNames();
+    
+    try {
+      Connection connection = dbConf.getConnection();
+      PreparedStatement statement = null;
+  
+      statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
+      return new DBRecordWriter(connection, statement);
+    }
+    catch (Exception ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Initializes the reduce-part of the job with the appropriate output settings
+   * 
+   * @param job The job
+   * @param tableName The table to insert data into
+   * @param fieldNames The field names in the table
+   */
+  public static void setOutput(JobConf job, String tableName, String... fieldNames) {
+    job.setOutputFormat(DBOutputFormat.class);
+    job.setReduceSpeculativeExecution(false);
+
+    DBConfiguration dbConf = new DBConfiguration(job);
+    
+    dbConf.setOutputTableName(tableName);
+    dbConf.setOutputFieldNames(fieldNames);
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,75 @@
+package org.apache.hadoop.mapred.lib.db;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Objects that are read from/written to a database should implement
+ * <code>DBWritable</code>. DBWritable, is similar to {@link Writable} 
+ * except that the {@link #write(PreparedStatement)} method takes a 
+ * {@link PreparedStatement}, and {@link #readFields(ResultSet)} 
+ * takes a {@link ResultSet}. 
+ * <p>
+ * Implementations are responsible for writing the fields of the object 
+ * to PreparedStatement, and reading the fields of the object from the 
+ * ResultSet. 
+ * 
+ * <p>Example:</p>
+ * If we have the following table in the database :
+ * <pre>
+ * CREATE TABLE MyTable (
+ *   counter        INTEGER NOT NULL,
+ *   timestamp      BIGINT  NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with :
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ *   // Some data     
+ *   private int counter;
+ *   private long timestamp;
+ *       
+ *   //Writable#write() implementation
+ *   public void write(DataOutput out) throws IOException {
+ *     out.writeInt(counter);
+ *     out.writeLong(timestamp);
+ *   }
+ *       
+ *   //Writable#readFields() implementation
+ *   public void readFields(DataInput in) throws IOException {
+ *     counter = in.readInt();
+ *     timestamp = in.readLong();
+ *   }
+ *       
+ *   public void write(PreparedStatement statement) throws SQLException {
+ *     statement.setInt(1, counter);
+ *     statement.setLong(2, timestamp);
+ *   }
+ *       
+ *   public void readFields(ResultSet resultSet) throws SQLException {
+ *     counter = resultSet.getInt(1);
+ *     timestamp = resultSet.getLong(2);
+ *   } 
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+  /**
+   * Sets the fields of the object in the {@link PreparedStatement}.
+   * @param statement the statement that the fields are put into.
+   * @throws SQLException
+   */
+	public void write(PreparedStatement statement) throws SQLException;
+	
+	/**
+	 * Reads the fields of the object from the {@link ResultSet}. 
+	 * @param resultSet the {@link ResultSet} to get the fields from.
+	 * @throws SQLException
+	 */
+	public void readFields(ResultSet resultSet) throws SQLException ; 
+	
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html Fri Sep 19 11:51:41 2008
@@ -0,0 +1,44 @@
+<html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+<h2>org.apache.hadoop.mapred.lib.db Package</h2>
+<p>
+This package contains a library to read records from a database as an 
+input to a mapreduce job, and write the output records to the database.   
+</p>
+<p>
+The Database to access can be configured using the static methods in the 
+DBConfiguration class. Jobs reading input from a database should use 
+DBInputFormat#setInput() to set the configuration. And jobs writing 
+its output to the database should use DBOutputFormat#setOutput().
+</p>
+<p> 
+Tuples from/to the database are converted to/from Java objects using 
+DBWritable methods. Typically, for each table in the db, a class extending
+DBWritable is defined, which holds the fields of the tuple. The fields 
+of a record are read from the database using DBWritable#readFields(ResultSet),
+and written to the database using DBWritable#write(PreparedStatament 
+statement). 
+</p>
+<p>
+An example program using both DBInputFormat and DBOutputFormat can be found 
+at src/examples/org/apache/hadoop/examples/DBCountPageview.java.
+</p>
+</body>
+</html>
\ No newline at end of file

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.DBCountPageView;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class TestDBJob extends HadoopTestCase {
+
+  public TestDBJob() throws IOException {
+    super(CLUSTER_MR, DFS_FS, 3, 1);
+  }
+  
+  public void testRun() throws Exception {
+    
+    DBCountPageView testDriver = new DBCountPageView();
+    
+    ToolRunner.run(createJobConf(), testDriver, new String[0]);
+    
+  }
+  
+}