You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 20:51:42 UTC
svn commit: r697184 - in /hadoop/core/trunk: ./ lib/
src/examples/org/apache/hadoop/examples/
src/mapred/org/apache/hadoop/mapred/lib/db/
src/test/org/apache/hadoop/mapred/lib/db/
Author: acmurthy
Date: Fri Sep 19 11:51:41 2008
New Revision: 697184
URL: http://svn.apache.org/viewvc?rev=697184&view=rev
Log:
HADOOP-2536. Implement a JDBC based database input and output formats to allow Map-Reduce applications to work with databases. Contributed by Fredrik Hedberg and Enis Soztutar.
Added:
hadoop/core/trunk/lib/hsqldb-LICENSE.txt
hadoop/core/trunk/lib/hsqldb.jar (with props)
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697184&r1=697183&r2=697184&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 11:51:41 2008
@@ -188,6 +188,10 @@
HADOOP-4070. Provide a mechanism in Hive for registering UDFs from the
query language. (tomwhite)
+ HADOOP-2536. Implement a JDBC based database input and output formats to
+ allow Map-Reduce applications to work with databases. (Fredrik Hedberg and
+ Enis Soztutar via acmurthy)
+
IMPROVEMENTS
HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).
Added: hadoop/core/trunk/lib/hsqldb-LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/hsqldb-LICENSE.txt?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/lib/hsqldb-LICENSE.txt (added)
+++ hadoop/core/trunk/lib/hsqldb-LICENSE.txt Fri Sep 19 11:51:41 2008
@@ -0,0 +1,66 @@
+/* Copyright (c) 1995-2000, The Hypersonic SQL Group.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the Hypersonic SQL Group nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE HYPERSONIC SQL GROUP,
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * This software consists of voluntary contributions made by many individuals
+ * on behalf of the Hypersonic SQL Group.
+ *
+ *
+ * For work added by the HSQL Development Group:
+ *
+ * Copyright (c) 2001-2004, The HSQL Development Group
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the HSQL Development Group nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL HSQL DEVELOPMENT GROUP, HSQLDB.ORG,
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
Added: hadoop/core/trunk/lib/hsqldb.jar
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/hsqldb.jar?rev=697184&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/core/trunk/lib/hsqldb.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java (added)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat;
+import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.hsqldb.Server;
+
+/**
+ * This is a demonstrative program, which uses DBInputFormat for reading
+ * the input data from a database, and DBOutputFormat for writing the data
+ * to the database.
+ * <br>
+ * The Program first creates the necessary tables, populates the input table
+ * and runs the mapred job.
+ * <br>
+ * The input data is a mini access log, with a <code><url,referrer,time>
+ * </code> schema.The output is the number of pageviews of each url in the log,
+ * having the schema <code><url,pageview></code>.
+ *
+ * When called with no arguments the program starts a local HSQLDB server, and
+ * uses this database for storing/retrieving the data.
+ */
+public class DBCountPageView extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
+
+ private Connection connection;
+ private boolean initialized = false;
+
+ private static final String[] AccessFieldNames = {"url", "referrer", "time"};
+ private static final String[] PageviewFieldNames = {"url", "pageview"};
+
+ private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
+ private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+ private Server server;
+
+ private void startHsqldbServer() {
+ server = new Server();
+ server.setDatabasePath(0,
+ System.getProperty("test.build.data",".") + "/URLAccess");
+ server.setDatabaseName(0, "URLAccess");
+ server.start();
+ }
+
+ private void createConnection(String driverClassName
+ , String url) throws Exception {
+
+ Class.forName(driverClassName);
+ connection = DriverManager.getConnection(url);
+ connection.setAutoCommit(false);
+ }
+
+ private void shutdown() throws SQLException {
+ connection.commit();
+ connection.close();
+
+ if(server != null) {
+ server.stop();
+ server.shutdown();
+ }
+ }
+
+ private void initialize(String driverClassName, String url)
+ throws Exception {
+ if(!this.initialized) {
+ if(driverClassName.equals(DRIVER_CLASS)) {
+ startHsqldbServer();
+ }
+ createConnection(driverClassName, url);
+ dropTables();
+ createTables();
+ populateAccess();
+ this.initialized = true;
+ }
+ }
+
+ private void dropTables() {
+ String dropAccess = "DROP TABLE Access";
+ String dropPageview = "DROP TABLE Pageview";
+
+ try {
+ Statement st = connection.createStatement();
+ st.executeUpdate(dropAccess);
+ st.executeUpdate(dropPageview);
+ connection.commit();
+ st.close();
+ }catch (SQLException ex) {
+ //ignore
+ }
+ }
+
+ private void createTables() throws SQLException {
+
+ String createAccess =
+ "CREATE TABLE " +
+ "Access(url VARCHAR(100) NOT NULL," +
+ " referrer VARCHAR(100)," +
+ " time BIGINT NOT NULL, " +
+ " PRIMARY KEY (url, time))";
+
+ String createPageview =
+ "CREATE TABLE " +
+ "Pageview(url VARCHAR(100) NOT NULL," +
+ " pageview BIGINT NOT NULL, " +
+ " PRIMARY KEY (url))";
+
+ Statement st = connection.createStatement();
+ try {
+ st.executeUpdate(createAccess);
+ st.executeUpdate(createPageview);
+ connection.commit();
+ } finally {
+ st.close();
+ }
+ }
+
+ /**
+ * Populates the Access table with generated records.
+ */
+ private void populateAccess() throws SQLException {
+
+ PreparedStatement statement = null ;
+ try {
+ statement = connection.prepareStatement(
+ "INSERT INTO Access(url, referrer, time)" +
+ " VALUES (?, ?, ?)");
+
+ Random random = new Random();
+
+ int time = random.nextInt(50) + 50;
+
+ final int PROBABILITY_PRECISION = 100; // 1 / 100
+ final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
+
+
+ //Pages in the site :
+ String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
+ //linkMatrix[i] is the array of pages(indexes) that page_i links to.
+ int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
+ {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
+
+ //a mini model of user browsing a la pagerank
+ int currentPage = random.nextInt(pages.length);
+ String referrer = null;
+
+ for(int i=0; i<time; i++) {
+
+ statement.setString(1, pages[currentPage]);
+ statement.setString(2, referrer);
+ statement.setLong(3, i);
+ statement.execute();
+
+ int action = random.nextInt(PROBABILITY_PRECISION);
+
+ //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
+ if(action < NEW_PAGE_PROBABILITY) {
+ currentPage = random.nextInt(pages.length); // a random page
+ referrer = null;
+ }
+ else {
+ referrer = pages[currentPage];
+ action = random.nextInt(linkMatrix[currentPage].length);
+ currentPage = linkMatrix[currentPage][action];
+ }
+ }
+
+ connection.commit();
+
+ }catch (SQLException ex) {
+ connection.rollback();
+ throw ex;
+ } finally {
+ if(statement != null) {
+ statement.close();
+ }
+ }
+ }
+
+ /**Verifies the results are correct */
+ private boolean verify() throws SQLException {
+ //check total num pageview
+ String countAccessQuery = "SELECT COUNT(*) FROM Access";
+ String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = connection.createStatement();
+ rs = st.executeQuery(countAccessQuery);
+ rs.next();
+ long totalPageview = rs.getLong(1);
+
+ rs = st.executeQuery(sumPageviewQuery);
+ rs.next();
+ long sumPageview = rs.getLong(1);
+
+ LOG.info("totalPageview=" + totalPageview);
+ LOG.info("sumPageview=" + sumPageview);
+
+ return totalPageview == sumPageview && totalPageview != 0;
+ }finally {
+ if(st != null)
+ st.close();
+ if(rs != null)
+ rs.close();
+ }
+ }
+
+ /** Holds a <url, referrer, time > tuple */
+ static class AccessRecord implements Writable, DBWritable {
+ String url;
+ String referrer;
+ long time;
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.url = Text.readString(in);
+ this.referrer = Text.readString(in);
+ this.time = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, url);
+ Text.writeString(out, referrer);
+ out.writeLong(time);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ this.url = resultSet.getString(1);
+ this.referrer = resultSet.getString(2);
+ this.time = resultSet.getLong(3);
+ }
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ statement.setString(1, url);
+ statement.setString(2, referrer);
+ statement.setLong(3, time);
+ }
+ }
+ /** Holds a <url, pageview > tuple */
+ static class PageviewRecord implements Writable, DBWritable {
+ String url;
+ long pageview;
+
+ public PageviewRecord(String url, long pageview) {
+ this.url = url;
+ this.pageview = pageview;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.url = Text.readString(in);
+ this.pageview = in.readLong();
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, url);
+ out.writeLong(pageview);
+ }
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ this.url = resultSet.getString(1);
+ this.pageview = resultSet.getLong(2);
+ }
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ statement.setString(1, url);
+ statement.setLong(2, pageview);
+ }
+ @Override
+ public String toString() {
+ return url + " " + pageview;
+ }
+ }
+
+ /**
+ * Mapper extracts URLs from the AccessRecord (tuples from db),
+ * and emits a <url,1> pair for each access record.
+ */
+ static class PageviewMapper extends MapReduceBase
+ implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
+
+ LongWritable ONE = new LongWritable(1L);
+ @Override
+ public void map(LongWritable key, AccessRecord value,
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
+ throws IOException {
+
+ Text oKey = new Text(value.url);
+ output.collect(oKey, ONE);
+ }
+ }
+
+ /**
+ * Reducer sums up the pageviews and emits a PageviewRecord,
+ * which will correspond to one tuple in the db.
+ */
+ static class PageviewReducer extends MapReduceBase
+ implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
+
+ NullWritable n = NullWritable.get();
+ @Override
+ public void reduce(Text key, Iterator<LongWritable> values,
+ OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
+ throws IOException {
+
+ long sum = 0L;
+ while(values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(new PageviewRecord(key.toString(), sum), n);
+ }
+ }
+
+ @Override
+ //Usage DBCountPageView [driverClass dburl]
+ public int run(String[] args) throws Exception {
+
+ String driverClassName = DRIVER_CLASS;
+ String url = DB_URL;
+
+ if(args.length > 1) {
+ driverClassName = args[0];
+ url = args[1];
+ }
+
+ initialize(driverClassName, url);
+
+ JobConf job = new JobConf(getConf(), DBCountPageView.class);
+
+ job.setJobName("Count Pageviews of URLs");
+
+ job.setMapperClass(PageviewMapper.class);
+ job.setCombinerClass(LongSumReducer.class);
+ job.setReducerClass(PageviewReducer.class);
+
+ DBConfiguration.configureDB(job, driverClassName, url);
+
+ DBInputFormat.setInput(job, AccessRecord.class, "Access"
+ , null, "url", AccessFieldNames);
+
+ DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ job.setOutputKeyClass(PageviewRecord.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ try {
+ JobClient.runJob(job);
+
+ boolean correct = verify();
+ if(!correct) {
+ throw new RuntimeException("Evaluation was not correct!");
+ }
+ } finally {
+ shutdown();
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new DBCountPageView(), args);
+ System.exit(ret);
+ }
+
+}
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=697184&r1=697183&r2=697184&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri Sep 19 11:51:41 2008
@@ -53,6 +53,7 @@
pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files.");
+ pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview counts from a database.");
pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
pgd.addClass("terasort", TeraSort.class, "Run the terasort");
pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat.NullDBWritable;
+
+/**
+ * A container for configuration property names for jobs with DB input/output.
+ * <br>
+ * The job can be configured using the static methods in this class,
+ * {@link DBInputFormat}, and {@link DBOutputFormat}.
+ * <p>
+ * Alternatively, the properties can be set in the configuration with proper
+ * values.
+ *
+ * @see DBConfiguration#configureDB(JobConf, String, String, String, String)
+ * @see DBInputFormat#setInput(JobConf, Class, String, String)
+ * @see DBInputFormat#setInput(JobConf, Class, String, String, String, String...)
+ * @see DBOutputFormat#setOutput(JobConf, String, String...)
+ */
+public class DBConfiguration {
+
+ /** The JDBC Driver class name */
+ public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+
+ /** JDBC Database access URL */
+ public static final String URL_PROPERTY = "mapred.jdbc.url";
+
+ /** User name to access the database */
+ public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+
+ /** Password to access the database */
+ public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+
+ /** Input table name */
+ public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+
+ /** Field names in the Input table */
+ public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+
+ /** WHERE clause in the input SELECT statement */
+ public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+
+ /** ORDER BY clause in the input SELECT statement */
+ public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+
+ /** Whole input query, exluding LIMIT...OFFSET */
+ public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+
+ /** Input query to get the count of records */
+ public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+
+ /** Class name implementing DBWritable which will hold input tuples */
+ public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+
+ /** Output table name */
+ public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+
+ /** Field names in the Output table */
+ public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";
+
+ /**
+ * Sets the DB access related fields in the JobConf.
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ * @param userName DB access username
+ * @param passwd DB access passwd
+ */
+ public static void configureDB(JobConf job, String driverClass, String dbUrl
+ , String userName, String passwd) {
+
+ job.set(DRIVER_CLASS_PROPERTY, driverClass);
+ job.set(URL_PROPERTY, dbUrl);
+ if(userName != null)
+ job.set(USERNAME_PROPERTY, userName);
+ if(passwd != null)
+ job.set(PASSWORD_PROPERTY, passwd);
+ }
+
+ /**
+ * Sets the DB access related fields in the JobConf.
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ */
+ public static void configureDB(JobConf job, String driverClass, String dbUrl) {
+ configureDB(job, driverClass, dbUrl, null, null);
+ }
+
+ private JobConf job;
+
+ DBConfiguration(JobConf job) {
+ this.job = job;
+ }
+
+ /** Returns a connection object o the DB
+ * @throws ClassNotFoundException
+ * @throws SQLException */
+ Connection getConnection() throws ClassNotFoundException, SQLException{
+
+ Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+
+ if(job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+ return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
+ } else {
+ return DriverManager.getConnection(
+ job.get(DBConfiguration.URL_PROPERTY),
+ job.get(DBConfiguration.USERNAME_PROPERTY),
+ job.get(DBConfiguration.PASSWORD_PROPERTY));
+ }
+ }
+
+ String getInputTableName() {
+ return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setInputTableName(String tableName) {
+ job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getInputFieldNames() {
+ return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setInputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ String getInputConditions() {
+ return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+ }
+
+ void setInputConditions(String conditions) {
+ if (conditions != null && conditions.length() > 0)
+ job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+ }
+
+ String getInputOrderBy() {
+ return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+ }
+
+ void setInputOrderBy(String orderby) {
+ if(orderby != null && orderby.length() >0) {
+ job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+ }
+ }
+
+ String getInputQuery() {
+ return job.get(DBConfiguration.INPUT_QUERY);
+ }
+
+ void setInputQuery(String query) {
+ if(query != null && query.length() >0) {
+ job.set(DBConfiguration.INPUT_QUERY, query);
+ }
+ }
+
+ String getInputCountQuery() {
+ return job.get(DBConfiguration.INPUT_COUNT_QUERY);
+ }
+
+ void setInputCountQuery(String query) {
+ if(query != null && query.length() >0) {
+ job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+ }
+ }
+
+
+ Class<?> getInputClass() {
+ return job.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class);
+ }
+
+ void setInputClass(Class<? extends DBWritable> inputClass) {
+ job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
+ }
+
+ String getOutputTableName() {
+ return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setOutputTableName(String tableName) {
+ job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getOutputFieldNames() {
+ return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setOutputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+}
+
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * <p>
+ * DBInputFormat emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ *
+ * The SQL query, and input class can be using one of the two
+ * setInput methods.
+ */
+public class DBInputFormat<T extends DBWritable>
+ implements InputFormat<LongWritable, T>, JobConfigurable {
+ /**
+ * A RecordReader that reads records from a SQL table.
+ * Emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ */
+ protected class DBRecordReader implements
+ RecordReader<LongWritable, T> {
+ private ResultSet results;
+
+ private Statement statement;
+
+ private Class<T> inputClass;
+
+ private JobConf job;
+
+ private DBInputSplit split;
+
+ private long pos = 0;
+
+ /**
+ * @param split The InputSplit to read data for
+ * @throws SQLException
+ */
+ protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
+ this.inputClass = inputClass;
+ this.split = split;
+ this.job = job;
+
+ statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+ //statement.setFetchSize(Integer.MIN_VALUE);
+ results = statement.executeQuery(getSelectQuery());
+ }
+
+ /** Returns the query for selecting the records,
+ * subclasses can override this for custom behaviour.*/
+ protected String getSelectQuery() {
+ StringBuilder query = new StringBuilder();
+
+ if(dbConf.getInputQuery() == null) {
+ query.append("SELECT ");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+ if(i != fieldNames.length -1) {
+ query.append(", ");
+ }
+ }
+
+ query.append(" FROM ").append(tableName);
+ query.append(" AS ").append(tableName); //in hsqldb this is necessary
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE (").append(conditions).append(")");
+ String orderBy = dbConf.getInputOrderBy();
+ if(orderBy != null && orderBy.length() > 0) {
+ query.append(" ORDER BY ").append(orderBy);
+ }
+ }
+ else {
+ query.append(dbConf.getInputQuery());
+ }
+
+ try {
+ query.append(" LIMIT ").append(split.getLength());
+ query.append(" OFFSET ").append(split.getStart());
+ }
+ catch (IOException ex) {
+ //ignore, will not throw
+ }
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ try {
+ connection.commit();
+ results.close();
+ statement.close();
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ /** {@inheritDoc} */
+ public T createValue() {
+ return ReflectionUtils.newInstance(inputClass, job);
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() throws IOException {
+ return pos / (float)split.getLength();
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(LongWritable key, T value) throws IOException {
+ try {
+ if (!results.next())
+ return false;
+
+ // Set the key field value as the output key value
+ key.set(pos + split.getStart());
+
+ value.readFields(results);
+
+ pos ++;
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ return true;
+ }
+ }
+
+ /**
+ * A Class that does nothing, implementing DBWritable
+ */
+ public static class NullDBWritable implements DBWritable, Writable {
+ @Override
+ public void readFields(DataInput in) throws IOException { }
+ @Override
+ public void readFields(ResultSet arg0) throws SQLException { }
+ @Override
+ public void write(DataOutput out) throws IOException { }
+ @Override
+ public void write(PreparedStatement arg0) throws SQLException { }
+ }
+ /**
+ * A InputSplit that spans a set of rows
+ */
+ protected static class DBInputSplit implements InputSplit {
+
+ private long end = 0;
+ private long start = 0;
+
+ /**
+ * Default Constructor
+ */
+ public DBInputSplit() {
+ }
+
+ /**
+ * Convenience Constructor
+ * @param start the index of the first row to select
+ * @param end the index of the last row to select
+ */
+ public DBInputSplit(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ /** {@inheritDoc} */
+ public String[] getLocations() throws IOException {
+ // TODO Add a layer to enable SQL "sharding" and support locality
+ return new String[] {};
+ }
+
+ /**
+ * @return The index of the first row to select
+ */
+ public long getStart() {
+ return start;
+ }
+
+ /**
+ * @return The index of the last row to select
+ */
+ public long getEnd() {
+ return end;
+ }
+
+ /**
+ * @return The total row count in this split
+ */
+ public long getLength() throws IOException {
+ return end - start;
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput input) throws IOException {
+ start = input.readLong();
+ end = input.readLong();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(start);
+ output.writeLong(end);
+ }
+ }
+
+ private String conditions;
+
+ private Connection connection;
+
+ private String tableName;
+
+ private String[] fieldNames;
+
+ private DBConfiguration dbConf;
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+
+ dbConf = new DBConfiguration(job);
+
+ try {
+ this.connection = dbConf.getConnection();
+ this.connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ tableName = dbConf.getInputTableName();
+ fieldNames = dbConf.getInputFieldNames();
+ conditions = dbConf.getInputConditions();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+
+ Class inputClass = dbConf.getInputClass();
+ try {
+ return new DBRecordReader((DBInputSplit) split, inputClass, job);
+ }
+ catch (SQLException ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
+
+ try {
+ Statement statement = connection.createStatement();
+
+ ResultSet results = statement.executeQuery(getCountQuery());
+ results.next();
+
+ long count = results.getLong(1);
+ long chunkSize = (count / chunks);
+
+ results.close();
+ statement.close();
+
+ InputSplit[] splits = new InputSplit[chunks];
+
+ // Split the rows into n-number of chunks and adjust the last chunk
+ // accordingly
+ for (int i = 0; i < chunks; i++) {
+ DBInputSplit split;
+
+ if ((i + 1) == chunks)
+ split = new DBInputSplit(i * chunkSize, count);
+ else
+ split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+ + chunkSize);
+
+ splits[i] = split;
+ }
+
+ return splits;
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /** Returns the query for getting the total number of rows,
+ * subclasses can override this for custom behaviour.*/
+ protected String getCountQuery() {
+
+ if(dbConf.getInputCountQuery() != null) {
+ return dbConf.getInputCountQuery();
+ }
+
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT COUNT(*) FROM " + tableName);
+
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE " + conditions);
+ return query.toString();
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the
+ * Java object holding tuple fields.
+ * @param tableName The table to read data from
+ * @param conditions The condition which to select data with, eg. '(updated >
+ * 20070101 AND length > 0)'
+ * @param orderBy the fieldNames in the orderBy clause.
+ * @param fieldNames The field names in the table
+ * @see #setInput(JobConf, Class, String, String)
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String tableName,String conditions, String orderBy, String... fieldNames) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputTableName(tableName);
+ dbConf.setInputFieldNames(fieldNames);
+ dbConf.setInputConditions(conditions);
+ dbConf.setInputOrderBy(orderBy);
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the
+ * Java object holding tuple fields.
+ * @param inputQuery the input query to select fields. Example :
+ * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
+ * @param inputCountQuery the input query that returns the number of records in
+ * the table.
+ * Example : "SELECT COUNT(f1) FROM Mytable"
+ * @see #setInput(JobConf, Class, String, String, String, String...)
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String inputQuery, String inputCountQuery) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputQuery(inputQuery);
+ dbConf.setInputCountQuery(inputCountQuery);
+
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table.
+ * <p>
+ * {@link DBOutputFormat} accepts <key,value> pairs, where
+ * key has a type extending DBWritable. Returned {@link RecordWriter}
+ * writes <b>only the key</b> to the database with a batch SQL query.
+ *
+ */
+public class DBOutputFormat<K extends DBWritable, V>
+implements OutputFormat<K,V> {
+
+ private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+
+ /**
+ * A RecordWriter that writes the reduce output to a SQL table
+ */
+ protected class DBRecordWriter
+ implements RecordWriter<K, V> {
+
+ private Connection connection;
+ private PreparedStatement statement;
+
+ protected DBRecordWriter(Connection connection
+ , PreparedStatement statement) throws SQLException {
+ this.connection = connection;
+ this.statement = statement;
+ this.connection.setAutoCommit(false);
+ }
+
+ /** {@inheritDoc} */
+ public void close(Reporter reporter) throws IOException {
+ try {
+ statement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ try {
+ connection.rollback();
+ }
+ catch (SQLException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ throw new IOException(e.getMessage());
+ } finally {
+ try {
+ statement.close();
+ connection.close();
+ }
+ catch (SQLException ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void write(K key, V value) throws IOException {
+ try {
+ key.write(statement);
+ statement.addBatch();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Constructs the query used as the prepared statement to insert data.
+ */
+ protected String constructQuery(String table, String[] fieldNames) {
+ StringBuilder query = new StringBuilder();
+ query.append("INSERT INTO ").append(table);
+ query.append(" VALUES (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append("?");
+ if(i != fieldNames.length - 1) {
+ query.append(",");
+ }
+ }
+ query.append(");");
+
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(FileSystem filesystem, JobConf job)
+ throws IOException {
+ }
+
+
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
+ JobConf job, String name, Progressable progress) throws IOException {
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+ String tableName = dbConf.getOutputTableName();
+ String[] fieldNames = dbConf.getOutputFieldNames();
+
+ try {
+ Connection connection = dbConf.getConnection();
+ PreparedStatement statement = null;
+
+ statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
+ return new DBRecordWriter(connection, statement);
+ }
+ catch (Exception ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+
+ /**
+ * Initializes the reduce-part of the job with the appropriate output settings
+ *
+ * @param job The job
+ * @param tableName The table to insert data into
+ * @param fieldNames The field names in the table
+ */
+ public static void setOutput(JobConf job, String tableName, String... fieldNames) {
+ job.setOutputFormat(DBOutputFormat.class);
+ job.setReduceSpeculativeExecution(false);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setOutputTableName(tableName);
+ dbConf.setOutputFieldNames(fieldNames);
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,75 @@
+package org.apache.hadoop.mapred.lib.db;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Objects that are read from/written to a database should implement
+ * <code>DBWritable</code>. DBWritable, is similar to {@link Writable}
+ * except that the {@link #write(PreparedStatement)} method takes a
+ * {@link PreparedStatement}, and {@link #readFields(ResultSet)}
+ * takes a {@link ResultSet}.
+ * <p>
+ * Implementations are responsible for writing the fields of the object
+ * to PreparedStatement, and reading the fields of the object from the
+ * ResultSet.
+ *
+ * <p>Example:</p>
+ * If we have the following table in the database :
+ * <pre>
+ * CREATE TABLE MyTable (
+ * counter INTEGER NOT NULL,
+ * timestamp BIGINT NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with :
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ * // Some data
+ * private int counter;
+ * private long timestamp;
+ *
+ * //Writable#write() implementation
+ * public void write(DataOutput out) throws IOException {
+ * out.writeInt(counter);
+ * out.writeLong(timestamp);
+ * }
+ *
+ * //Writable#readFields() implementation
+ * public void readFields(DataInput in) throws IOException {
+ * counter = in.readInt();
+ * timestamp = in.readLong();
+ * }
+ *
+ * public void write(PreparedStatement statement) throws SQLException {
+ * statement.setInt(1, counter);
+ * statement.setLong(2, timestamp);
+ * }
+ *
+ * public void readFields(ResultSet resultSet) throws SQLException {
+ * counter = resultSet.getInt(1);
+ * timestamp = resultSet.getLong(2);
+ * }
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+ /**
+ * Sets the fields of the object in the {@link PreparedStatement}.
+ * @param statement the statement that the fields are put into.
+ * @throws SQLException
+ */
+ public void write(PreparedStatement statement) throws SQLException;
+
+ /**
+ * Reads the fields of the object from the {@link ResultSet}.
+ * @param resultSet the {@link ResultSet} to get the fields from.
+ * @throws SQLException
+ */
+ public void readFields(ResultSet resultSet) throws SQLException ;
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/package.html Fri Sep 19 11:51:41 2008
@@ -0,0 +1,44 @@
+<html>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+<h2>org.apache.hadoop.mapred.lib.db Package</h2>
+<p>
+This package contains a library to read records from a database as an
+input to a mapreduce job, and write the output records to the database.
+</p>
+<p>
+The Database to access can be configured using the static methods in the
+DBConfiguration class. Jobs reading input from a database should use
+DBInputFormat#setInput() to set the configuration. And jobs writing
+its output to the database should use DBOutputFormat#setOutput().
+</p>
+<p>
+Tuples from/to the database are converted to/from Java objects using
+DBWritable methods. Typically, for each table in the db, a class extending
+DBWritable is defined, which holds the fields of the tuple. The fields
+of a record are read from the database using DBWritable#readFields(ResultSet),
+and written to the database using DBWritable#write(PreparedStatament
+statement).
+</p>
+<p>
+An example program using both DBInputFormat and DBOutputFormat can be found
+at src/examples/org/apache/hadoop/examples/DBCountPageview.java.
+</p>
+</body>
+</html>
\ No newline at end of file
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java?rev=697184&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/db/TestDBJob.java Fri Sep 19 11:51:41 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib.db;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.DBCountPageView;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class TestDBJob extends HadoopTestCase {
+
+ public TestDBJob() throws IOException {
+ super(CLUSTER_MR, DFS_FS, 3, 1);
+ }
+
+ public void testRun() throws Exception {
+
+ DBCountPageView testDriver = new DBCountPageView();
+
+ ToolRunner.run(createJobConf(), testDriver, new String[0]);
+
+ }
+
+}