You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC

svn commit: r1190489 [4/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.util.LoggingUtils;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Abstract RecordWriter base class that buffers SqoopRecords to be injected
+ * into JDBC SQL PreparedStatements to be executed by the
+ * AsyncSqlOutputFormat's background thread.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V>
+    extends RecordWriter<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(AsyncSqlRecordWriter.class);
+
+  private Connection connection;
+
+  private Configuration conf;
+
+  protected final int rowsPerStmt; // rows to insert per statement.
+
+  // Buffer for records to be put into export SQL statements.
+  private List<SqoopRecord> records;
+
+  // Background thread to actually perform the updates.
+  private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
+  private boolean startedExecThread;
+
+  public AsyncSqlRecordWriter(TaskAttemptContext context)
+      throws ClassNotFoundException, SQLException {
+    this.conf = context.getConfiguration();
+
+    this.rowsPerStmt = conf.getInt(
+        AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
+        AsyncSqlOutputFormat.DEFAULT_RECORDS_PER_STATEMENT);
+    int stmtsPerTx = conf.getInt(
+        AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY,
+        AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION);
+
+    DBConfiguration dbConf = new DBConfiguration(conf);
+    this.connection = dbConf.getConnection();
+    this.connection.setAutoCommit(false);
+
+    this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
+
+    this.execThread = new AsyncSqlOutputFormat.AsyncSqlExecThread(
+        connection, stmtsPerTx);
+    this.execThread.setDaemon(true);
+    this.startedExecThread = false;
+  }
+
+  /**
+   * Allow subclasses access to the Connection instance we hold.
+   * This Connection is shared with the asynchronous SQL exec thread.
+   * Any uses of the Connection must be synchronized on it.
+   * @return the Connection object used for this SQL transaction.
+   */
+  protected final Connection getConnection() {
+    return this.connection;
+  }
+
+  /**
+   * Allow subclasses access to the Configuration.
+   * @return the Configuration for this MapReduc task.
+   */
+  protected final Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Should return 'true' if the PreparedStatements generated by the
+   * RecordWriter are intended to be executed in "batch" mode, or false
+   * if it's just one big statement.
+   */
+  protected boolean isBatchExec() {
+    return false;
+  }
+
+  /**
+   * Generate the PreparedStatement object that will be fed into the execution
+   * thread. All parameterized fields of the PreparedStatement must be set in
+   * this method as well; this is usually based on the records collected from
+   * the user in the userRecords list.
+   *
+   * Note that any uses of the Connection object here must be synchronized on
+   * the Connection.
+   *
+   * @param userRecords a list of records that should be injected into SQL
+   * statements.
+   * @return a PreparedStatement to be populated with rows
+   * from the collected record list.
+   */
+  protected abstract PreparedStatement getPreparedStatement(
+      List<SqoopRecord> userRecords) throws SQLException;
+
+  /**
+   * Takes the current contents of 'records' and formats and executes the
+   * INSERT statement.
+   * @param closeConn if true, commits the transaction and closes the
+   * connection.
+   */
+  private void execUpdate(boolean commit, boolean stopThread)
+      throws InterruptedException, SQLException {
+
+    if (!startedExecThread) {
+      this.execThread.start();
+      this.startedExecThread = true;
+    }
+
+    PreparedStatement stmt = null;
+    boolean successfulPut = false;
+    try {
+      if (records.size() > 0) {
+        stmt = getPreparedStatement(records);
+        this.records.clear();
+      }
+
+      // Pass this operation off to the update thread. This will block if
+      // the update thread is already performing an update.
+      AsyncSqlOutputFormat.AsyncDBOperation op =
+          new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(),
+                  commit, stopThread);
+      execThread.put(op);
+      successfulPut = true; // op has been posted to the other thread.
+    } finally {
+      if (!successfulPut && null != stmt) {
+        // We created a statement but failed to enqueue it. Close it.
+        stmt.close();
+      }
+    }
+
+    // Check for any previous SQLException. If one happened, rethrow it here.
+    SQLException lastException = execThread.getLastError();
+    if (null != lastException) {
+      LoggingUtils.logAll(LOG, lastException);
+      throw lastException;
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    try {
+      try {
+        execUpdate(true, true);
+        execThread.join();
+      } catch (SQLException sqle) {
+        throw new IOException(sqle);
+      }
+
+      // If we're not leaving on an error return path already,
+      // now that execThread is definitely stopped, check that the
+      // error slot remains empty.
+      SQLException lastErr = execThread.getLastError();
+      if (null != lastErr) {
+        throw new IOException(lastErr);
+      }
+    } finally {
+      try {
+        closeConnection(context);
+      } catch (SQLException sqle) {
+        throw new IOException(sqle);
+      }
+    }
+  }
+
+  public void closeConnection(TaskAttemptContext context)
+      throws SQLException {
+    this.connection.close();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void write(K key, V value)
+      throws InterruptedException, IOException {
+    try {
+      records.add((SqoopRecord) key.clone());
+      if (records.size() >= this.rowsPerStmt) {
+        execUpdate(false, false);
+      }
+    } catch (CloneNotSupportedException cnse) {
+      throw new IOException("Could not buffer record", cnse);
+    } catch (SQLException sqlException) {
+      throw new IOException(sqlException);
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Identity mapper that continuously reports progress via a background thread.
+ */
+public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
+    extends Mapper<KEYIN, VALIN, KEYOUT, VALOUT> {
+
+  public static final Log LOG = LogFactory.getLog(
+      AutoProgressMapper.class.getName());
+
+  /**
+   * Total number of millis for which progress will be reported by the
+   * auto-progress thread. If this is zero, then the auto-progress thread will
+   * never voluntarily exit.
+   */
+  private int maxProgressPeriod;
+
+  /**
+   * Number of milliseconds to sleep for between loop iterations. Must be less
+   * than report interval.
+   */
+  private int sleepInterval;
+
+  /**
+   * Number of milliseconds between calls to Reporter.progress().
+   * Should be a multiple of the sleepInterval.
+   */
+  private int reportInterval;
+
+  public static final String MAX_PROGRESS_PERIOD_KEY =
+      "sqoop.mapred.auto.progress.max";
+  public static final String SLEEP_INTERVAL_KEY =
+      "sqoop.mapred.auto.progress.sleep";
+  public static final String REPORT_INTERVAL_KEY =
+      "sqoop.mapred.auto.progress.report";
+
+  // Sleep for 10 seconds at a time.
+  public static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+  // Report progress every 30 seconds.
+  public static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+  // Disable max progress, by default.
+  public static final int DEFAULT_MAX_PROGRESS = 0;
+
+  private class ProgressThread extends Thread {
+
+    private volatile boolean keepGoing; // While this is true, thread runs.
+
+    private Context context;
+    private long startTimeMillis;
+    private long lastReportMillis;
+
+    public ProgressThread(final Context ctxt) {
+      this.context = ctxt;
+      this.keepGoing = true;
+    }
+
+    public void signalShutdown() {
+      this.keepGoing = false; // volatile update.
+      this.interrupt();
+    }
+
+    public void run() {
+      this.lastReportMillis = System.currentTimeMillis();
+      this.startTimeMillis = this.lastReportMillis;
+
+      final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
+      final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
+      final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
+
+      // In a loop:
+      //   * Check that we haven't run for too long (maxProgressPeriod).
+      //   * If it's been a report interval since we last made progress,
+      //     make more.
+      //   * Sleep for a bit.
+      //   * If the parent thread has signaled for exit, do so.
+      while (this.keepGoing) {
+        long curTimeMillis = System.currentTimeMillis();
+
+        if (MAX_PROGRESS != 0
+            && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+          this.keepGoing = false;
+          LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
+              + " ms.");
+          break;
+        }
+
+        if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+          // It's been a full report interval -- claim progress.
+          LOG.debug("Auto-progress thread reporting progress");
+          this.context.progress();
+          this.lastReportMillis = curTimeMillis;
+        }
+
+        // Unless we got an interrupt while we were working,
+        // sleep a bit before doing more work.
+        if (!Thread.interrupted()) {
+          try {
+            Thread.sleep(SLEEP_INTERVAL);
+          } catch (InterruptedException ie) {
+            // we were notified on something; not necessarily an error.
+          }
+        }
+      }
+
+      LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+    }
+  }
+
+  /**
+   * Set configuration parameters for the auto-progress thread.
+   */
+  private void configureAutoProgress(Configuration job) {
+    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
+        DEFAULT_MAX_PROGRESS);
+    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
+        DEFAULT_SLEEP_INTERVAL);
+    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
+        DEFAULT_REPORT_INTERVAL);
+
+    if (this.reportInterval < 1) {
+      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+          + DEFAULT_REPORT_INTERVAL);
+      this.reportInterval = DEFAULT_REPORT_INTERVAL;
+    }
+
+    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+          + DEFAULT_SLEEP_INTERVAL);
+      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+    }
+
+    if (this.maxProgressPeriod < 0) {
+      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+          + DEFAULT_MAX_PROGRESS);
+      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+    }
+  }
+
+
+  // map() method intentionally omitted; Mapper.map() is the identity mapper.
+
+
+  /**
+   * Run the mapping process for this task, wrapped in an auto-progress system.
+   */
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    configureAutoProgress(context.getConfiguration());
+    ProgressThread thread = this.new ProgressThread(context);
+
+    try {
+      thread.setDaemon(true);
+      thread.start();
+
+      // use default run() method to actually drive the mapping.
+      super.run(context);
+    } finally {
+      // Tell the progress thread to exit..
+      LOG.debug("Instructing auto-progress thread to quit.");
+      thread.signalShutdown();
+      try {
+        // And wait for that to happen.
+        LOG.debug("Waiting for progress thread shutdown...");
+        thread.join();
+        LOG.debug("Progress thread shutdown detected.");
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted when waiting on auto-progress thread: "
+            + ie.toString());
+      }
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import com.cloudera.sqoop.orm.ClassWriter;
+
+/**
+ * Exports records from an Avro data file.
+ */
+public class AvroExportMapper
+    extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
+              SqoopRecord, NullWritable> {
+
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+
+  private static final String TIME_TYPE = "java.sql.Time";
+
+  private static final String DATE_TYPE = "java.sql.Date";
+
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+
+  public static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map";
+
+  private MapWritable columnTypes;
+  private SqoopRecord recordImpl;
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    // Instantiate a copy of the user's class to hold and parse the record.
+    String recordClassName = conf.get(
+        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+          + ") is not set!");
+    }
+
+    try {
+      Class cls = Class.forName(recordClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == recordImpl) {
+      throw new IOException("Could not instantiate object of type "
+          + recordClassName);
+    }
+
+    columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
+        MapWritable.class);
+  }
+
+  @Override
+  protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
+      Context context) throws IOException, InterruptedException {
+    context.write(toSqoopRecord(key.datum()), NullWritable.get());
+  }
+
+  private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
+    Schema avroSchema = record.getSchema();
+    for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
+      String columnName = e.getKey().toString();
+      String columnType = e.getValue().toString();
+      String cleanedCol = ClassWriter.toIdentifier(columnName);
+      Field field = getField(avroSchema, cleanedCol, record);
+      if (field == null) {
+        throw new IOException("Cannot find field " + cleanedCol
+          + " in Avro schema " + avroSchema);
+      } else {
+        Object avroObject = record.get(field.name());
+        Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
+        recordImpl.setField(cleanedCol, fieldVal);
+      }
+    }
+    return recordImpl;
+  }
+
+  private Field getField(Schema avroSchema, String fieldName,
+      GenericRecord record) {
+    for (Field field : avroSchema.getFields()) {
+      if (field.name().equalsIgnoreCase(fieldName)) {
+        return field;
+      }
+    }
+    return null;
+  }
+
+  private Object fromAvro(Object avroObject, Schema fieldSchema,
+      String columnType) {
+    // map from Avro type to Sqoop's Java representation of the SQL type
+    // see SqlManager#toJavaType
+
+    if (avroObject == null) {
+      return null;
+    }
+
+    switch (fieldSchema.getType()) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+      case INT:
+      case FLOAT:
+      case DOUBLE:
+        return avroObject;
+      case LONG:
+        if (columnType.equals(DATE_TYPE)) {
+          return new Date((Long) avroObject);
+        } else if (columnType.equals(TIME_TYPE)) {
+          return new Time((Long) avroObject);
+        } else if (columnType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) avroObject);
+        }
+        return avroObject;
+      case BYTES:
+        ByteBuffer bb = (ByteBuffer) avroObject;
+        BytesWritable bw = new BytesWritable();
+        bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+        return bw;
+      case STRING:
+        if (columnType.equals(BIG_DECIMAL_TYPE)) {
+          return new BigDecimal(avroObject.toString());
+        } else if (columnType.equals(DATE_TYPE)) {
+          return Date.valueOf(avroObject.toString());
+        } else if (columnType.equals(TIME_TYPE)) {
+          return Time.valueOf(avroObject.toString());
+        } else if (columnType.equals(TIMESTAMP_TYPE)) {
+          return Timestamp.valueOf(avroObject.toString());
+        }
+        return avroObject.toString();
+      case ENUM:
+        return ((GenericEnumSymbol) avroObject).toString();
+      case UNION:
+        List<Schema> types = fieldSchema.getTypes();
+        if (types.size() != 2) {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+        Schema s1 = types.get(0);
+        Schema s2 = types.get(1);
+        if (s1.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s2, columnType);
+        } else if (s2.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s1, columnType);
+        } else {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+      case FIXED:
+        return new BytesWritable(((GenericFixed) avroObject).bytes());
+      case RECORD:
+      case ARRAY:
+      case MAP:
+      default:
+        throw new IllegalArgumentException("Cannot convert Avro type "
+            + fieldSchema.getType());
+    }
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by transforming them to Avro records in an Avro data file.
+ */
+public class AvroImportMapper
+    extends AutoProgressMapper<LongWritable, SqoopRecord,
+    AvroWrapper<GenericRecord>, NullWritable> {
+
+  private final AvroWrapper<GenericRecord> wrapper =
+    new AvroWrapper<GenericRecord>();
+  private Schema schema;
+
+  @Override
+  protected void setup(Context context) {
+    schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+  }
+
+  @Override
+  protected void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+    wrapper.datum(toGenericRecord(val));
+    context.write(wrapper, NullWritable.get());
+  }
+
+
+  private GenericRecord toGenericRecord(SqoopRecord val) {
+    Map<String, Object> fieldMap = val.getFieldMap();
+    GenericRecord record = new GenericData.Record(schema);
+    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+      record.put(entry.getKey(), toAvro(entry.getValue()));
+    }
+    return record;
+  }
+
+  /**
+   * Convert the Avro representation of a Java type (that has already been
+   * converted from the SQL equivalent).
+   * @param o
+   * @return
+   */
+  private Object toAvro(Object o) {
+    if (o instanceof BigDecimal) {
+      return o.toString();
+    } else if (o instanceof Date) {
+      return ((Date) o).getTime();
+    } else if (o instanceof Time) {
+      return ((Time) o).getTime();
+    } else if (o instanceof Timestamp) {
+      return ((Timestamp) o).getTime();
+    } else if (o instanceof BytesWritable) {
+      BytesWritable bw = (BytesWritable) o;
+      return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+    } else if (o instanceof ClobRef) {
+      throw new UnsupportedOperationException("ClobRef not suported");
+    } else if (o instanceof BlobRef) {
+      throw new UnsupportedOperationException("BlobRef not suported");
+    }
+    // primitive types (Integer, etc) are left unchanged
+    return o;
+  }
+
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. */
+public class AvroInputFormat<T>
+  extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job)) {
+      if (file.getPath().getName().endsWith(
+          org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+        result.add(file);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    context.setStatus(split.toString());
+    return new AvroRecordReader<T>();
+  }
+
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Helper class for setting up an Avro MapReduce job.
+ */
+public final class AvroJob {
+  public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+
+  private AvroJob() {
+  }
+
+  public static void setMapOutputSchema(Configuration job, Schema s) {
+    job.set(MAP_OUTPUT_SCHEMA, s.toString());
+  }
+
+  /** Return a job's map output key schema. */
+  public static Schema getMapOutputSchema(Configuration job) {
+    return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+public class AvroOutputFormat<T>
+  extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+
+    Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+
+    final DataFileWriter<T> WRITER =
+      new DataFileWriter<T>(new GenericDatumWriter<T>());
+
+    Path path = getDefaultWorkFile(context,
+        org.apache.avro.mapred.AvroOutputFormat.EXT);
+    WRITER.create(schema,
+        path.getFileSystem(context.getConfiguration()).create(path));
+
+    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+      @Override
+      public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+        throws IOException {
+        WRITER.append(wrapper.datum());
+      }
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+        WRITER.close();
+      }
+    };
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+  extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+  private FileReader<T> reader;
+  private long start;
+  private long end;
+  private AvroWrapper<T> key;
+  private NullWritable value;
+
+  @Override
+  public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration conf = context.getConfiguration();
+    SeekableInput in = new FsInput(split.getPath(), conf);
+    DatumReader<T> datumReader = new GenericDatumReader<T>();
+    this.reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(split.getStart());                    // sync to start
+    this.start = reader.tell();
+    this.end = split.getStart() + split.getLength();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!reader.hasNext() || reader.pastSync(end)) {
+      key = null;
+      value = null;
+      return false;
+    }
+    if (key == null) {
+      key = new AvroWrapper<T>();
+    }
+    if (value == null) {
+      value = NullWritable.get();
+    }
+    key.datum(reader.next(key.datum()));
+    return true;
+  }
+
+  @Override
+  public AvroWrapper<T> getCurrentKey() throws IOException,
+      InterruptedException {
+    return key;
+  }
+
+  @Override
+  public NullWritable getCurrentValue()
+      throws IOException, InterruptedException {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return reader.tell();
+  }
+
+  @Override
+  public void close() throws IOException { reader.close(); }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * RecordReader that CombineFileRecordReader can instantiate, which itself
+ * translates a CombineFileSplit into a FileSplit.
+ */
+public class CombineShimRecordReader
+   extends RecordReader<LongWritable, Object> {
+
+  public static final Log LOG =
+     LogFactory.getLog(CombineShimRecordReader.class.getName());
+
+  private CombineFileSplit split;
+  private TaskAttemptContext context;
+  private int index;
+  private RecordReader<LongWritable, Object> rr;
+
+  /**
+   * Constructor invoked by CombineFileRecordReader that identifies part of a
+   * CombineFileSplit to use.
+   */
+  public CombineShimRecordReader(CombineFileSplit split,
+      TaskAttemptContext context, Integer index)
+      throws IOException, InterruptedException {
+    this.index = index;
+    this.split = (CombineFileSplit) split;
+    this.context = context;
+
+    createChildReader();
+  }
+
+  @Override
+  public void initialize(InputSplit curSplit, TaskAttemptContext curContext)
+      throws IOException, InterruptedException {
+    this.split = (CombineFileSplit) curSplit;
+    this.context = curContext;
+
+    if (null == rr) {
+      createChildReader();
+    }
+
+    FileSplit fileSplit = new FileSplit(this.split.getPath(index),
+        this.split.getOffset(index), this.split.getLength(index),
+        this.split.getLocations());
+    this.rr.initialize(fileSplit, this.context);
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return rr.getProgress();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != rr) {
+      rr.close();
+      rr = null;
+    }
+  }
+
+  @Override
+  public LongWritable getCurrentKey()
+      throws IOException, InterruptedException {
+    return rr.getCurrentKey();
+  }
+
+  @Override
+  public Object getCurrentValue()
+      throws IOException, InterruptedException {
+    return rr.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return rr.nextKeyValue();
+  }
+
+  /**
+   * Actually instantiate the user's chosen RecordReader implementation.
+   */
+  @SuppressWarnings("unchecked")
+  private void createChildReader() throws IOException, InterruptedException {
+    LOG.debug("ChildSplit operates on: " + split.getPath(index));
+
+    Configuration conf = context.getConfiguration();
+
+    // Determine the file format we're reading.
+    Class rrClass;
+    if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
+      rrClass = SequenceFileRecordReader.class;
+    } else {
+      rrClass = LineRecordReader.class;
+    }
+
+    // Create the appropriate record reader.
+    this.rr = (RecordReader<LongWritable, Object>)
+        ReflectionUtils.newInstance(rrClass, conf);
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.ImportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+
+/**
+ * Actually runs a jdbc import job using the ORM files generated by the
+ * sqoop.orm package. Uses DataDrivenDBInputFormat.
+ */
+public class DataDrivenImportJob extends ImportJobBase {
+
+  public static final Log LOG = LogFactory.getLog(
+      DataDrivenImportJob.class.getName());
+
+  @SuppressWarnings("unchecked")
+  public DataDrivenImportJob(final SqoopOptions opts) {
+    super(opts, null, DataDrivenDBInputFormat.class, null, null);
+  }
+
+  public DataDrivenImportJob(final SqoopOptions opts,
+      final Class<? extends InputFormat> inputFormatClass,
+      ImportJobContext context) {
+    super(opts, null, inputFormatClass, null, context);
+  }
+
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws IOException {
+    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+      // For text files, specify these as the output types; for
+      // other types, we just use the defaults.
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(NullWritable.class);
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.AvroDataFile) {
+      ConnManager connManager = getContext().getConnManager();
+      AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+          connManager, tableName);
+      Schema schema = generator.generate();
+      AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
+    }
+
+    job.setMapperClass(getMapperClass());
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+      return TextImportMapper.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.SequenceFile) {
+      return SequenceFileImportMapper.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.AvroDataFile) {
+      return AvroImportMapper.class;
+    }
+
+    return null;
+  }
+
+  @Override
+  protected Class<? extends OutputFormat> getOutputFormatClass()
+      throws ClassNotFoundException {
+    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+      return RawKeyTextOutputFormat.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.SequenceFile) {
+      return SequenceFileOutputFormat.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.AvroDataFile) {
+      return AvroOutputFormat.class;
+    }
+
+    return null;
+  }
+
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol) throws IOException {
+    ConnManager mgr = getContext().getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(), options.getConnectString(),
+            options.getFetchSize());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(), options.getConnectString(),
+            username, options.getPassword(), options.getFetchSize());
+      }
+
+      if (null != tableName) {
+        // Import a table.
+        String [] colNames = options.getColumns();
+        if (null == colNames) {
+          colNames = mgr.getColumnNames(tableName);
+        }
+
+        String [] sqlColNames = null;
+        if (null != colNames) {
+          sqlColNames = new String[colNames.length];
+          for (int i = 0; i < colNames.length; i++) {
+            sqlColNames[i] = mgr.escapeColName(colNames[i]);
+          }
+        }
+
+        // It's ok if the where clause is null in DBInputFormat.setInput.
+        String whereClause = options.getWhereClause();
+
+        // We can't set the class properly in here, because we may not have the
+        // jar loaded in this JVM. So we start by calling setInput() with
+        // DBWritable and then overriding the string manually.
+        DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+            mgr.escapeTableName(tableName), whereClause,
+            mgr.escapeColName(splitByCol), sqlColNames);
+
+        // If user specified boundary query on the command line propagate it to
+        // the job
+        if(options.getBoundaryQuery() != null) {
+          DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
+                  options.getBoundaryQuery());
+        }
+      } else {
+        // Import a free-form query.
+        String inputQuery = options.getSqlQuery();
+        String sanitizedQuery = inputQuery.replace(
+            DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
+
+        String inputBoundingQuery = options.getBoundaryQuery();
+
+        if(inputBoundingQuery == null) {
+          inputBoundingQuery =
+            mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
+          if (inputBoundingQuery == null) {
+            if (splitByCol != null) {
+              inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
+                      + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
+            } else {
+              inputBoundingQuery = "";
+            }
+          }
+        }
+        DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+            inputQuery, inputBoundingQuery);
+        new DBConfiguration(job.getConfiguration()).setInputOrderBy(
+            splitByCol);
+      }
+
+      LOG.debug("Using table class: " + tableClassName);
+      job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
+          tableClassName);
+
+      job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
+          options.getInlineLobLimit());
+
+      LOG.debug("Using InputFormat: " + inputFormatClass);
+      job.setInputFormatClass(inputFormatClass);
+    } finally {
+      try {
+        mgr.close();
+      } catch (SQLException sqlE) {
+        LOG.warn("Error closing connection: " + sqlE);
+      }
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * OutputFormat that produces a RecordReader which instantiates
+ * a FieldMapProcessor which will process FieldMappable
+ * output keys.
+ *
+ * <p>The output value is ignored.</p>
+ *
+ * <p>The FieldMapProcessor implementation may do any arbitrary
+ * processing on the object. For example, it may write an object
+ * to HBase, etc.</p>
+ *
+ * <p>If the FieldMapProcessor implementation also implements
+ * Closeable, it will be close()'d in the RecordReader's close()
+ * method.</p>
+ *
+ * <p>If the FMP implements Configurable, it will be configured
+ * correctly via ReflectionUtils.</p>
+ */
+public class DelegatingOutputFormat<K extends FieldMappable, V>
+    extends OutputFormat<K, V> {
+
+  /** conf key: the FieldMapProcessor class to instantiate. */
+  public static final String DELEGATE_CLASS_KEY =
+      "sqoop.output.delegate.field.map.processor.class";
+
+  @Override
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    if (null == conf.get(DELEGATE_CLASS_KEY)) {
+      throw new IOException("Delegate FieldMapProcessor class is not set.");
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new NullOutputCommitter();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new DelegatingRecordWriter(context);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class DelegatingRecordWriter extends RecordWriter<K, V> {
+
+    private Configuration conf;
+
+    private FieldMapProcessor mapProcessor;
+
+    public DelegatingRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException {
+
+      this.conf = context.getConfiguration();
+
+      @SuppressWarnings("unchecked")
+      Class<? extends FieldMapProcessor> procClass =
+          (Class<? extends FieldMapProcessor>)
+          conf.getClass(DELEGATE_CLASS_KEY, null);
+      this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf);
+    }
+
+    protected Configuration getConf() {
+      return this.conf;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void close(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      if (mapProcessor instanceof Closeable) {
+        ((Closeable) mapProcessor).close();
+      }
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void write(K key, V value)
+        throws InterruptedException, IOException {
+      try {
+        mapProcessor.accept(key);
+      } catch (ProcessingException pe) {
+        throw new IOException(pe);
+      }
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * This class uses batch mode to execute underlying statements instead of
+ * using a single multirow insert statement as its superclass.
+ */
+public class ExportBatchOutputFormat<K extends SqoopRecord, V>
+    extends ExportOutputFormat<K, V> {
+
+  private static final Log LOG =
+      LogFactory.getLog(ExportBatchOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new ExportBatchRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class ExportBatchRecordWriter extends ExportRecordWriter {
+
+    public ExportBatchRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected boolean isBatchExec() {
+      // We use batches here.
+      return true;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+        stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
+      }
+
+      // Inject the record parameters into the VALUES clauses.
+      for (SqoopRecord record : userRecords) {
+        record.write(stmt, 0);
+        stmt.addBatch();
+      }
+
+      return stmt;
+    }
+
+    /**
+     * @return an INSERT statement.
+     */
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + tableName + " ");
+
+      int numSlots;
+      if (this.columnNames != null) {
+        numSlots = this.columnNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : columnNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = this.columnCount; // set if columnNames is null.
+      }
+
+      sb.append("VALUES ");
+
+      // generates the (?, ?, ?...).
+      sb.append("(");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sb.append(", ");
+        }
+
+        sb.append("?");
+      }
+      sb.append(")");
+
+      return sb.toString();
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * InputFormat that generates a user-defined number of splits to inject data
+ * into the database.
+ */
+public class ExportInputFormat
+   extends CombineFileInputFormat<LongWritable, Object> {
+
+  public static final Log LOG =
+     LogFactory.getLog(ExportInputFormat.class.getName());
+
+  public static final int DEFAULT_NUM_MAP_TASKS = 4;
+
+  public ExportInputFormat() {
+  }
+
+  /**
+   * @return the number of bytes across all files in the job.
+   */
+  private long getJobSize(JobContext job) throws IOException {
+    List<FileStatus> stats = listStatus(job);
+    long count = 0;
+    for (FileStatus stat : stats) {
+      count += stat.getLen();
+    }
+
+    return count;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    // Set the max split size based on the number of map tasks we want.
+    long numTasks = getNumMapTasks(job);
+    long numFileBytes = getJobSize(job);
+    long maxSplitSize = numFileBytes / numTasks;
+
+    setMaxSplitSize(maxSplitSize);
+
+    LOG.debug("Target numMapTasks=" + numTasks);
+    LOG.debug("Total input bytes=" + numFileBytes);
+    LOG.debug("maxSplitSize=" + maxSplitSize);
+
+    List<InputSplit> splits =  super.getSplits(job);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated splits:");
+      for (InputSplit split : splits) {
+        LOG.debug("  " + split);
+      }
+    }
+    return splits;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordReader createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+
+    CombineFileSplit combineSplit = (CombineFileSplit) split;
+
+    // Use CombineFileRecordReader since this can handle CombineFileSplits
+    // and instantiate another RecordReader in a loop; do this with the
+    // CombineShimRecordReader.
+    RecordReader rr = new CombineFileRecordReader(combineSplit, context,
+        CombineShimRecordReader.class);
+
+    return rr;
+  }
+
+  /**
+   * Allows the user to control the number of map tasks used for this
+   * export job.
+   */
+  public static void setNumMapTasks(JobContext job, int numTasks) {
+    job.getConfiguration().setInt(ExportJobBase.EXPORT_MAP_TASKS_KEY, numTasks);
+  }
+
+  /**
+   * @return the number of map tasks to use in this export job.
+   */
+  public static int getNumMapTasks(JobContext job) {
+    return job.getConfiguration().getInt(ExportJobBase.EXPORT_MAP_TASKS_KEY,
+        DEFAULT_NUM_MAP_TASKS);
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Base class for running an export MapReduce job.
+ */
+public class ExportJobBase extends JobBase {
+
+  /**
+   * The (inferred) type of a file or group of files.
+   */
+  public enum FileType {
+    SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+  }
+
+  public static final Log LOG = LogFactory.getLog(
+      ExportJobBase.class.getName());
+
+  /** What SqoopRecord class to use to read a record for export. */
+  public static final String SQOOP_EXPORT_TABLE_CLASS_KEY =
+      "sqoop.mapreduce.export.table.class";
+
+  /**
+   * What column of the table to use for the WHERE clause of
+   * an updating export.
+   */
+  public static final String SQOOP_EXPORT_UPDATE_COL_KEY =
+      "sqoop.mapreduce.export.update.col";
+
+  /** Number of map tasks to use for an export. */
+  public static final String EXPORT_MAP_TASKS_KEY =
+      "sqoop.mapreduce.export.map.tasks";
+
+  protected ExportJobContext context;
+
+  public ExportJobBase(final ExportJobContext ctxt) {
+    this(ctxt, null, null, null);
+  }
+
+  public ExportJobBase(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
+    this.context = ctxt;
+  }
+
+  /**
+   * @return true if p is a SequenceFile, or a directory containing
+   * SequenceFiles.
+   */
+  public static boolean isSequenceFiles(Configuration conf, Path p)
+      throws IOException {
+    return getFileType(conf, p) == FileType.SEQUENCE_FILE;
+  }
+
+  /**
+   * @return the type of the file represented by p (or the files in p, if a
+   * directory)
+   */
+  public static FileType getFileType(Configuration conf, Path p)
+      throws IOException {
+    FileSystem fs = p.getFileSystem(conf);
+
+    try {
+      FileStatus stat = fs.getFileStatus(p);
+
+      if (null == stat) {
+        // Couldn't get the item.
+        LOG.warn("Input path " + p + " does not exist");
+        return FileType.UNKNOWN;
+      }
+
+      if (stat.isDir()) {
+        FileStatus [] subitems = fs.listStatus(p);
+        if (subitems == null || subitems.length == 0) {
+          LOG.warn("Input path " + p + " contains no files");
+          return FileType.UNKNOWN; // empty dir.
+        }
+
+        // Pick a child entry to examine instead.
+        boolean foundChild = false;
+        for (int i = 0; i < subitems.length; i++) {
+          stat = subitems[i];
+          if (!stat.isDir() && !stat.getPath().getName().startsWith("_")) {
+            foundChild = true;
+            break; // This item is a visible file. Check it.
+          }
+        }
+
+        if (!foundChild) {
+          stat = null; // Couldn't find a reasonable candidate.
+        }
+      }
+
+      if (null == stat) {
+        LOG.warn("null FileStatus object in isSequenceFiles(); "
+            + "assuming false.");
+        return FileType.UNKNOWN;
+      }
+
+      Path target = stat.getPath();
+      return fromMagicNumber(target, conf);
+    } catch (FileNotFoundException fnfe) {
+      LOG.warn("Input path " + p + " does not exist");
+      return FileType.UNKNOWN; // doesn't exist!
+    }
+  }
+
+  /**
+   * @param file a file to test.
+   * @return true if 'file' refers to a SequenceFile.
+   */
+  private static FileType fromMagicNumber(Path file, Configuration conf) {
+    // Test target's header to see if it contains magic numbers indicating its
+    // file type
+    byte [] header = new byte[3];
+    FSDataInputStream is = null;
+    try {
+      FileSystem fs = file.getFileSystem(conf);
+      is = fs.open(file);
+      is.readFully(header);
+    } catch (IOException ioe) {
+      // Error reading header or EOF; assume unknown
+      LOG.warn("IOException checking input file header: " + ioe);
+      return FileType.UNKNOWN;
+    } finally {
+      try {
+        if (null != is) {
+          is.close();
+        }
+      } catch (IOException ioe) {
+        // ignore; closing.
+        LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
+      }
+    }
+
+    if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
+      return FileType.SEQUENCE_FILE;
+    }
+    if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
+      return FileType.AVRO_DATA_FILE;
+    }
+    return FileType.UNKNOWN;
+  }
+
+  /**
+   * @return the Path to the files we are going to export to the db.
+   */
+  protected Path getInputPath() throws IOException {
+    Path inputPath = new Path(context.getOptions().getExportDir());
+    Configuration conf = options.getConf();
+    inputPath = inputPath.makeQualified(FileSystem.get(conf));
+    return inputPath;
+  }
+
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+    FileInputFormat.addInputPath(job, getInputPath());
+  }
+
+  @Override
+  protected Class<? extends InputFormat> getInputFormatClass()
+      throws ClassNotFoundException {
+    Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
+    if (null == configuredIF) {
+      return ExportInputFormat.class;
+    } else {
+      return configuredIF;
+    }
+  }
+
+  @Override
+  protected Class<? extends OutputFormat> getOutputFormatClass()
+      throws ClassNotFoundException {
+    Class<? extends OutputFormat> configuredOF = super.getOutputFormatClass();
+    if (null == configuredOF) {
+      if (!options.isBatchMode()) {
+        return ExportOutputFormat.class;
+      } else {
+        return ExportBatchOutputFormat.class;
+      }
+    } else {
+      return configuredOF;
+    }
+  }
+
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+
+    job.setMapperClass(getMapperClass());
+
+    // Concurrent writes of the same records would be problematic.
+    ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
+
+    job.setMapOutputKeyClass(SqoopRecord.class);
+    job.setMapOutputValueClass(NullWritable.class);
+  }
+
+  @Override
+  protected int configureNumTasks(Job job) throws IOException {
+    int numMaps = super.configureNumTasks(job);
+    job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps);
+    return numMaps;
+  }
+
+  @Override
+  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+      InterruptedException {
+
+    PerfCounters perfCounters = new PerfCounters();
+    perfCounters.startClock();
+
+    boolean success = job.waitForCompletion(true);
+    perfCounters.stopClock();
+
+    Counters jobCounters = job.getCounters();
+    // If the job has been retired, these may be unavailable.
+    if (null == jobCounters) {
+      displayRetiredJobNotice(LOG);
+    } else {
+      perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
+        .findCounter("HDFS_BYTES_READ").getValue());
+      LOG.info("Transferred " + perfCounters.toString());
+      long numRecords =  ConfigurationHelper.getNumMapInputRecords(job);
+      LOG.info("Exported " + numRecords + " records.");
+    }
+
+    return success;
+  }
+
+  /**
+   * Run an export job to dump a table from HDFS to a database. If a staging
+   * table is specified and the connection manager supports staging of data,
+   * the export will first populate the staging table and then migrate the
+   * data to the target table.
+   * @throws IOException if the export job encounters an IO error
+   * @throws ExportException if the job fails unexpectedly or is misconfigured.
+   */
+  public void runExport() throws ExportException, IOException {
+
+    ConnManager cmgr = context.getConnManager();
+    SqoopOptions options = context.getOptions();
+    Configuration conf = options.getConf();
+
+    String outputTableName = context.getTableName();
+    String stagingTableName = context.getOptions().getStagingTableName();
+
+    String tableName = outputTableName;
+    boolean stagingEnabled = false;
+    if (stagingTableName != null) { // user has specified the staging table
+      if (cmgr.supportsStagingForExport()) {
+        LOG.info("Data will be staged in the table: " + stagingTableName);
+        tableName = stagingTableName;
+        stagingEnabled = true;
+      } else {
+        throw new ExportException("The active connection manager ("
+            + cmgr.getClass().getCanonicalName()
+            + ") does not support staging of data for export. "
+            + "Please retry without specifying the --staging-table option.");
+      }
+    }
+
+    String tableClassName =
+        new TableClassName(options).getClassForTable(outputTableName);
+    String ormJarFile = context.getJarFile();
+
+    LOG.info("Beginning export of " + outputTableName);
+    loadJars(conf, ormJarFile, tableClassName);
+
+    if (stagingEnabled) {
+      // Prepare the staging table
+      if (options.doClearStagingTable()) {
+        try {
+          // Delete all records from staging table
+          cmgr.deleteAllRecords(stagingTableName);
+        } catch (SQLException ex) {
+          throw new ExportException(
+              "Failed to empty staging table before export run", ex);
+        }
+      } else {
+        // User has not explicitly specified the clear staging table option.
+        // Assert that the staging table is empty.
+        try {
+          long rowCount = cmgr.getTableRowCount(stagingTableName);
+          if (rowCount != 0L) {
+            throw new ExportException("The specified staging table ("
+                + stagingTableName + ") is not empty. To force deletion of "
+                + "its data, please retry with --clear-staging-table option.");
+          }
+        } catch (SQLException ex) {
+          throw new ExportException(
+              "Failed to count data rows in staging table: "
+                  + stagingTableName, ex);
+        }
+      }
+    }
+
+    try {
+      Job job = new Job(conf);
+
+      // Set the external jar to use for the job.
+      job.getConfiguration().set("mapred.jar", ormJarFile);
+
+      configureInputFormat(job, tableName, tableClassName, null);
+      configureOutputFormat(job, tableName, tableClassName);
+      configureMapper(job, tableName, tableClassName);
+      configureNumTasks(job);
+      cacheJars(job, context.getConnManager());
+      setJob(job);
+      boolean success = runJob(job);
+      if (!success) {
+        throw new ExportException("Export job failed!");
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      unloadJars();
+    }
+
+    // Unstage the data if needed
+    if (stagingEnabled) {
+      // Migrate data from staging table to the output table
+      try {
+        LOG.info("Starting to migrate data from staging table to destination.");
+        cmgr.migrateData(stagingTableName, outputTableName);
+      } catch (SQLException ex) {
+        LOG.error("Failed to move data from staging table ("
+            + stagingTableName + ") to target table ("
+            + outputTableName + ")", ex);
+        throw new ExportException(
+            "Failed to move data from staging table", ex);
+      }
+    }
+  }
+
+  /**
+   * @return true if the input directory contains SequenceFiles.
+   * @deprecated use {@link #getInputFileType()} instead
+   */
+  @Deprecated
+  protected boolean inputIsSequenceFiles() {
+    try {
+      return isSequenceFiles(
+          context.getOptions().getConf(), getInputPath());
+    } catch (IOException ioe) {
+      LOG.warn("Could not check file format for export; assuming text");
+      return false;
+    }
+  }
+
+  protected FileType getInputFileType() {
+    try {
+      return getFileType(context.getOptions().getConf(), getInputPath());
+    } catch (IOException ioe) {
+      return FileType.UNKNOWN;
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Insert the emitted keys as records into a database table.
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the
+ * SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class ExportOutputFormat<K extends SqoopRecord, V>
+    extends AsyncSqlOutputFormat<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    DBConfiguration dbConf = new DBConfiguration(conf);
+
+    // Sanity check all the configuration values we need.
+    if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+      throw new IOException("Database connection URL is not set.");
+    } else if (null == dbConf.getOutputTableName()) {
+      throw new IOException("Table name is not set for export");
+    } else if (null == dbConf.getOutputFieldNames()
+        && 0 == dbConf.getOutputFieldCount()) {
+      throw new IOException(
+          "Output field names are null and zero output field count set.");
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new ExportRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class ExportRecordWriter extends AsyncSqlRecordWriter<K, V> {
+
+    protected String tableName;
+    protected String [] columnNames; // The columns to insert into.
+    protected int columnCount; // If columnNames is null, tells ## of cols.
+
+    public ExportRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+
+      Configuration conf = getConf();
+
+      DBConfiguration dbConf = new DBConfiguration(conf);
+      tableName = dbConf.getOutputTableName();
+      columnNames = dbConf.getOutputFieldNames();
+      columnCount = dbConf.getOutputFieldCount();
+    }
+
+    /**
+     * @return the name of the table we are inserting into.
+     */
+    protected final String getTableName() {
+      return tableName;
+    }
+
+    /**
+     * @return the list of columns we are updating.
+     */
+    protected final String [] getColumnNames() {
+      if (null == columnNames) {
+        return null;
+      } else {
+        return Arrays.copyOf(columnNames, columnNames.length);
+      }
+    }
+
+    /**
+     * @return the number of columns we are updating.
+     */
+    protected final int getColumnCount() {
+      return columnCount;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+        stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
+      }
+
+      // Inject the record parameters into the VALUES clauses.
+      int position = 0;
+      for (SqoopRecord record : userRecords) {
+        position += record.write(stmt, position);
+      }
+
+      return stmt;
+    }
+
+    /**
+     * @return an INSERT statement suitable for inserting 'numRows' rows.
+     */
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + tableName + " ");
+
+      int numSlots;
+      if (this.columnNames != null) {
+        numSlots = this.columnNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : columnNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = this.columnCount; // set if columnNames is null.
+      }
+
+      sb.append("VALUES ");
+
+      // generates the (?, ?, ?...) used for each row.
+      StringBuilder sbRow = new StringBuilder();
+      sbRow.append("(");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sbRow.append(", ");
+        }
+
+        sbRow.append("?");
+      }
+      sbRow.append(")");
+
+      // Now append that numRows times.
+      for (int i = 0; i < numRows; i++) {
+        if (i != 0) {
+          sb.append(", ");
+        }
+
+        sb.append(sbRow);
+      }
+
+      return sb.toString();
+    }
+  }
+}