You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC

svn commit: r1190489 [5/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.hbase.HBasePutProcessor;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Runs an HBase import via DataDrivenDBInputFormat to the HBasePutProcessor
+ * in the DelegatingOutputFormat.
+ */
+public class HBaseImportJob extends DataDrivenImportJob {
+
+  public static final Log LOG = LogFactory.getLog(
+      HBaseImportJob.class.getName());
+
+  public HBaseImportJob(final SqoopOptions opts,
+      final ImportJobContext importContext) {
+    super(opts, importContext.getInputFormat(), importContext);
+  }
+
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws IOException {
+    job.setOutputKeyClass(SqoopRecord.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setMapperClass(getMapperClass());
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    return HBaseImportMapper.class;
+  }
+
+  @Override
+  protected Class<? extends OutputFormat> getOutputFormatClass()
+      throws ClassNotFoundException {
+    return DelegatingOutputFormat.class;
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+
+    // Use the DelegatingOutputFormat with the HBasePutProcessor.
+    job.setOutputFormatClass(getOutputFormatClass());
+
+    Configuration conf = job.getConfiguration();
+    conf.setClass("sqoop.output.delegate.field.map.processor.class",
+        HBasePutProcessor.class,
+        FieldMapProcessor.class);
+
+    // Set the HBase parameters (table, column family, row key):
+    conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
+    conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
+
+    // What column of the input becomes the row key?
+    String rowKeyCol = options.getHBaseRowKeyColumn();
+    if (null == rowKeyCol) {
+      // User didn't explicitly set one. If there's a split-by column set,
+      // use that.
+      rowKeyCol = options.getSplitByCol();
+    }
+
+    if (null == rowKeyCol) {
+      // No split-by column is explicitly set.
+      // If the table has a primary key, use that.
+      ConnManager manager = getContext().getConnManager();
+      rowKeyCol = manager.getPrimaryKey(tableName);
+    }
+
+    if (null == rowKeyCol) {
+      // Give up here if this is still unset.
+      throw new IOException("Could not determine the row-key column. "
+          + "Use --hbase-row-key to specify the input column that "
+          + "names each row.");
+    }
+
+    conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol);
+  }
+
+  @Override
+  /** Create the target HBase table before running the job. */
+  protected void jobSetup(Job job) throws IOException, ImportException {
+    Configuration conf = job.getConfiguration();
+    String tableName = conf.get(HBasePutProcessor.TABLE_NAME_KEY);
+    String familyName = conf.get(HBasePutProcessor.COL_FAMILY_KEY);
+
+    if (null == tableName) {
+      throw new ImportException(
+          "Import to HBase error: Table name not specified");
+    }
+
+    if (null == familyName) {
+      throw new ImportException(
+          "Import to HBase error: Column family not specified");
+    }
+
+    // Add HBase configuration files to this conf object.
+    HBaseConfiguration.addHbaseResources(conf);
+
+    HBaseAdmin admin = new HBaseAdmin(conf);
+
+    // Check to see if the table exists.
+    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+    byte [] familyBytes = Bytes.toBytes(familyName);
+    HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
+    if (!admin.tableExists(tableName)) {
+      if (options.getCreateHBaseTable()) {
+        // Create the table.
+        LOG.info("Creating missing HBase table " + tableName);
+        tableDesc.addFamily(colDesc);
+        admin.createTable(tableDesc);
+      } else {
+        LOG.warn("Could not find HBase table " + tableName);
+        LOG.warn("This job may fail. Either explicitly create the table,");
+        LOG.warn("or re-run with --hbase-create-table.");
+      }
+    } else if (!tableDesc.hasFamily(familyBytes)) {
+      if (options.getCreateHBaseTable()) {
+        // Create the column family.
+        LOG.info("Creating missing column family " + familyName);
+        admin.disableTable(tableName);
+        admin.addColumn(tableName, colDesc);
+        admin.enableTable(tableName);
+      } else {
+        LOG.warn("Could not find column family " + familyName + " in table "
+            + tableName);
+        LOG.warn("This job may fail. Either create the column family,");
+        LOG.warn("or re-run with --hbase-create-table.");
+      }
+    }
+
+    // Make sure HBase libraries are shipped as part of the job.
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.addDependencyJars(conf, HTable.class);
+
+    super.jobSetup(job);
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by writing them to HBase via the DelegatingOutputFormat
+ * and the HBasePutProcessor.
+ */
+public class HBaseImportMapper
+    extends AutoProgressMapper
+    <LongWritable, SqoopRecord, SqoopRecord, NullWritable> {
+
+  @Override
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+    context.write(val, NullWritable.get());
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.io.CodecMap;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Base class for running an import MapReduce job.
+ * Allows dependency injection, etc, for easy customization of import job types.
+ */
+public class ImportJobBase extends JobBase {
+
+  private ImportJobContext context;
+
+  public static final Log LOG = LogFactory.getLog(
+      ImportJobBase.class.getName());
+
+  public ImportJobBase() {
+    this(null);
+  }
+
+  public ImportJobBase(final SqoopOptions opts) {
+    this(opts, null, null, null, null);
+  }
+
+  public ImportJobBase(final SqoopOptions opts,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ImportJobContext context) {
+    super(opts, mapperClass, inputFormatClass, outputFormatClass);
+    this.context = context;
+  }
+
+  /**
+   * Configure the output format to use for the job.
+   */
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+
+    job.setOutputFormatClass(getOutputFormatClass());
+
+    if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+      job.getConfiguration().set("mapred.output.value.class", tableClassName);
+    }
+
+    if (options.shouldUseCompression()) {
+      FileOutputFormat.setCompressOutput(job, true);
+
+      String codecName = options.getCompressionCodec();
+      Class<? extends CompressionCodec> codecClass;
+      if (codecName == null) {
+        codecClass = GzipCodec.class;
+      } else {
+        Configuration conf = job.getConfiguration();
+        codecClass = CodecMap.getCodec(codecName, conf).getClass();
+      }
+      FileOutputFormat.setOutputCompressorClass(job, codecClass);
+
+      if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+        SequenceFileOutputFormat.setOutputCompressionType(job,
+            CompressionType.BLOCK);
+      }
+    }
+
+    Path outputPath = context.getDestination();
+    FileOutputFormat.setOutputPath(job, outputPath);
+  }
+
+  /**
+   * Actually run the MapReduce job.
+   */
+  @Override
+  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+      InterruptedException {
+
+    PerfCounters perfCounters = new PerfCounters();
+    perfCounters.startClock();
+
+    boolean success = job.waitForCompletion(true);
+    perfCounters.stopClock();
+
+    Counters jobCounters = job.getCounters();
+    // If the job has been retired, these may be unavailable.
+    if (null == jobCounters) {
+      displayRetiredJobNotice(LOG);
+    } else {
+      perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
+        .findCounter("HDFS_BYTES_WRITTEN").getValue());
+      LOG.info("Transferred " + perfCounters.toString());
+      long numRecords = ConfigurationHelper.getNumMapOutputRecords(job);
+      LOG.info("Retrieved " + numRecords + " records.");
+    }
+    return success;
+  }
+
+
+  /**
+   * Run an import job to read a table in to HDFS.
+   *
+   * @param tableName  the database table to read; may be null if a free-form
+   * query is specified in the SqoopOptions, and the ImportJobBase subclass
+   * supports free-form queries.
+   * @param ormJarFile the Jar file to insert into the dcache classpath.
+   * (may be null)
+   * @param splitByCol the column of the database table to use to split
+   * the import
+   * @param conf A fresh Hadoop Configuration to use to build an MR job.
+   * @throws IOException if the job encountered an IO problem
+   * @throws ImportException if the job failed unexpectedly or was
+   * misconfigured.
+   */
+  public void runImport(String tableName, String ormJarFile, String splitByCol,
+      Configuration conf) throws IOException, ImportException {
+
+    if (null != tableName) {
+      LOG.info("Beginning import of " + tableName);
+    } else {
+      LOG.info("Beginning query import.");
+    }
+
+    String tableClassName =
+        new TableClassName(options).getClassForTable(tableName);
+    loadJars(conf, ormJarFile, tableClassName);
+
+    try {
+      Job job = new Job(conf);
+
+      // Set the external jar to use for the job.
+      job.getConfiguration().set("mapred.jar", ormJarFile);
+
+      configureInputFormat(job, tableName, tableClassName, splitByCol);
+      configureOutputFormat(job, tableName, tableClassName);
+      configureMapper(job, tableName, tableClassName);
+      configureNumTasks(job);
+      cacheJars(job, getContext().getConnManager());
+
+      jobSetup(job);
+      setJob(job);
+      boolean success = runJob(job);
+      if (!success) {
+        throw new ImportException("Import job failed!");
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      unloadJars();
+    }
+  }
+
+  /**
+   * Open-ended "setup" routine that is called after the job is configured
+   * but just before it is submitted to MapReduce. Subclasses may override
+   * if necessary.
+   */
+  protected void jobSetup(Job job) throws IOException, ImportException {
+  }
+
+  protected ImportJobContext getContext() {
+    return context;
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an export using JDBC (JDBC-based ExportOutputFormat).
+ */
+public class JdbcExportJob extends ExportJobBase {
+
+  private FileType fileType;
+
+  public static final Log LOG = LogFactory.getLog(
+      JdbcExportJob.class.getName());
+
+  public JdbcExportJob(final ExportJobContext context) {
+    super(context);
+  }
+
+  public JdbcExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+
+    fileType = getInputFileType();
+
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+    if (fileType == FileType.AVRO_DATA_FILE) {
+      LOG.debug("Configuring for Avro export");
+      ConnManager connManager = context.getConnManager();
+      Map<String, Integer> columnTypeInts =
+        connManager.getColumnTypes(tableName, options.getSqlQuery());
+      MapWritable columnTypes = new MapWritable();
+      for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+        Text columnName = new Text(e.getKey());
+        Text columnText = new Text(connManager.toJavaType(e.getValue()));
+        columnTypes.put(columnName, columnText);
+      }
+      DefaultStringifier.store(job.getConfiguration(), columnTypes,
+          AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+    }
+
+  }
+
+  @Override
+  protected Class<? extends InputFormat> getInputFormatClass()
+      throws ClassNotFoundException {
+    if (fileType == FileType.AVRO_DATA_FILE) {
+      return AvroInputFormat.class;
+    }
+    return super.getInputFormatClass();
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    switch (fileType) {
+      case SEQUENCE_FILE:
+        return SequenceFileExportMapper.class;
+      case AVRO_DATA_FILE:
+        return AvroExportMapper.class;
+      case UNKNOWN:
+      default:
+        return TextExportMapper.class;
+    }
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws IOException {
+
+    ConnManager mgr = context.getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            username, options.getPassword());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
+      }
+      DBOutputFormat.setOutput(job, tableName, colNames);
+
+      job.setOutputFormatClass(getOutputFormatClass());
+      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
+  }
+
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
+ */
+public class JdbcUpdateExportJob extends ExportJobBase {
+
+  public static final Log LOG = LogFactory.getLog(
+      JdbcUpdateExportJob.class.getName());
+
+  /**
+   * Return an instance of the UpdateOutputFormat class object loaded
+   * from the shim jar.
+   */
+  private static Class<? extends OutputFormat> getUpdateOutputFormat()
+       throws IOException {
+    return UpdateOutputFormat.class;
+  }
+
+  public JdbcUpdateExportJob(final ExportJobContext context)
+      throws IOException {
+    super(context, null, null, getUpdateOutputFormat());
+  }
+
+  public JdbcUpdateExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    if (inputIsSequenceFiles()) {
+      return SequenceFileExportMapper.class;
+    } else {
+      return TextExportMapper.class;
+    }
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws IOException {
+
+    ConnManager mgr = context.getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            username, options.getPassword());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
+      }
+
+      if (null == colNames) {
+        throw new IOException(
+            "Export column names could not be determined for " + tableName);
+      }
+
+      String updateKeyColumns = options.getUpdateKeyCol();
+      if (null == updateKeyColumns) {
+        throw new IOException("Update key column not set in export job");
+      }
+      // Update key columns lookup and removal
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      Set<String> updateKeysUppercase = new HashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+          updateKeysUppercase.add(nextUpdateKey.toUpperCase());
+        }  else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      if (updateKeys.size() == 0) {
+        throw new IOException("Unpdate key columns not valid in export job");
+      }
+
+      // Make sure we strip out the key column from this list.
+      String [] outColNames = new String[colNames.length - updateKeys.size()];
+      int j = 0;
+      for (int i = 0; i < colNames.length; i++) {
+        if (!updateKeysUppercase.contains(colNames[i].toUpperCase())) {
+          outColNames[j++] = colNames[i];
+        }
+      }
+      DBOutputFormat.setOutput(job, tableName, outColNames);
+
+      job.setOutputFormatClass(getOutputFormatClass());
+      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ */
+public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
+
+  public static final Log LOG = LogFactory.getLog(
+      JdbcUpsertExportJob.class.getName());
+
+  public JdbcUpsertExportJob(final ExportJobContext context,
+      final Class<? extends OutputFormat> outputFormatClass)
+      throws IOException {
+    super(context, null, null, outputFormatClass);
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws IOException {
+
+    ConnManager mgr = context.getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            username, options.getPassword());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
+      }
+      if (null == colNames) {
+        throw new IOException(
+            "Export column names could not be determined for " + tableName);
+      }
+      DBOutputFormat.setOutput(job, tableName, colNames);
+
+      String updateKeyColumns = options.getUpdateKeyCol();
+      if (null == updateKeyColumns) {
+        throw new IOException("Update key column not set in export job");
+      }
+      // Update key columns lookup and removal
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+        } else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      if (updateKeys.size() == 0) {
+        throw new IOException("Unpdate key columns not valid in export job");
+      }
+
+      job.setOutputFormatClass(getOutputFormatClass());
+      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.tool.SqoopTool;
+import com.cloudera.sqoop.util.ClassLoaderStack;
+import com.cloudera.sqoop.util.Jars;
+
+/**
+ * Base class for configuring and running a MapReduce job.
+ * Allows dependency injection, etc, for easy customization of import job types.
+ */
+public class JobBase {
+
+  public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
+
+  protected SqoopOptions options;
+  protected Class<? extends Mapper> mapperClass;
+  protected Class<? extends InputFormat> inputFormatClass;
+  protected Class<? extends OutputFormat> outputFormatClass;
+
+  private Job mrJob;
+
+  private ClassLoader prevClassLoader = null;
+
+  public JobBase() {
+    this(null);
+  }
+
+  public JobBase(final SqoopOptions opts) {
+    this(opts, null, null, null);
+  }
+
+  public JobBase(final SqoopOptions opts,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+
+    this.options = opts;
+    this.mapperClass = mapperClass;
+    this.inputFormatClass = inputFormatClass;
+    this.outputFormatClass = outputFormatClass;
+  }
+
+  /**
+   * @return the mapper class to use for the job.
+   */
+  protected Class<? extends Mapper> getMapperClass()
+      throws ClassNotFoundException {
+    return this.mapperClass;
+  }
+
+  /**
+   * @return the inputformat class to use for the job.
+   */
+  protected Class<? extends InputFormat> getInputFormatClass()
+      throws ClassNotFoundException {
+    return this.inputFormatClass;
+  }
+
+  /**
+   * @return the outputformat class to use for the job.
+   */
+  protected Class<? extends OutputFormat> getOutputFormatClass()
+      throws ClassNotFoundException {
+    return this.outputFormatClass;
+  }
+
+  /** Set the OutputFormat class to use for this job. */
+  public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
+    this.outputFormatClass = cls;
+  }
+
+  /** Set the InputFormat class to use for this job. */
+  public void setInputFormatClass(Class<? extends InputFormat> cls) {
+    this.inputFormatClass = cls;
+  }
+
+  /** Set the Mapper class to use for this job. */
+  public void setMapperClass(Class<? extends Mapper> cls) {
+    this.mapperClass = cls;
+  }
+
+  /**
+   * Set the SqoopOptions configuring this job.
+   */
+  public void setOptions(SqoopOptions opts) {
+    this.options = opts;
+  }
+
+  /**
+   * Put jar files required by Sqoop into the DistributedCache.
+   * @param job the Job being submitted.
+   * @param mgr the ConnManager to use.
+   */
+  protected void cacheJars(Job job, ConnManager mgr)
+      throws IOException {
+
+    Configuration conf = job.getConfiguration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Set<String> localUrls = new HashSet<String>();
+
+    addToCache(Jars.getSqoopJarPath(), fs, localUrls);
+    if (null != mgr) {
+      addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
+      addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
+    }
+
+    SqoopTool tool = this.options.getActiveSqoopTool();
+    if (null != tool) {
+      // Make sure the jar for the tool itself is on the classpath. (In case
+      // this is a third-party plugin tool.)
+      addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls);
+      List<String> toolDeps = tool.getDependencyJars();
+      if (null != toolDeps) {
+        for (String depFile : toolDeps) {
+          addToCache(depFile, fs, localUrls);
+        }
+      }
+    }
+
+    // If the user specified a particular jar file name,
+
+    // Add anything in $SQOOP_HOME/lib, if this is set.
+    String sqoopHome = System.getenv("SQOOP_HOME");
+    if (null != sqoopHome) {
+      File sqoopHomeFile = new File(sqoopHome);
+      File sqoopLibFile = new File(sqoopHomeFile, "lib");
+      if (sqoopLibFile.exists()) {
+        addDirToCache(sqoopLibFile, fs, localUrls);
+      }
+    } else {
+      LOG.warn("SQOOP_HOME is unset. May not be able to find "
+          + "all job dependencies.");
+    }
+
+    // If we didn't put anything in our set, then there's nothing to cache.
+    if (localUrls.isEmpty()) {
+      return;
+    }
+
+    // Add these to the 'tmpjars' array, which the MR JobSubmitter
+    // will upload to HDFS and put in the DistributedCache libjars.
+    String tmpjars = conf.get("tmpjars");
+    StringBuilder sb = new StringBuilder();
+    if (null != tmpjars) {
+      sb.append(tmpjars);
+      sb.append(",");
+    }
+    sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
+    conf.set("tmpjars", sb.toString());
+  }
+
+  private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
+    if (null == file) {
+      return;
+    }
+
+    Path p = new Path(file);
+    String qualified = p.makeQualified(fs).toString();
+    LOG.debug("Adding to job classpath: " + qualified);
+    localUrls.add(qualified);
+  }
+
+  /**
+   * Add the .jar elements of a directory to the DCache classpath,
+   * nonrecursively.
+   */
+  private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
+    if (null == dir) {
+      return;
+    }
+
+    for (File libfile : dir.listFiles()) {
+      if (libfile.exists() && !libfile.isDirectory()
+          && libfile.getName().endsWith("jar")) {
+        addToCache(libfile.toString(), fs, localUrls);
+      }
+    }
+  }
+
+  /**
+   * If jars must be loaded into the local environment, do so here.
+   */
+  protected void loadJars(Configuration conf, String ormJarFile,
+      String tableClassName) throws IOException {
+    boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
+        || "local".equals(conf.get("mapred.job.tracker"));
+    if (isLocal) {
+      // If we're using the LocalJobRunner, then instead of using the compiled
+      // jar file as the job source, we're running in the current thread. Push
+      // on another classloader that loads from that jar in addition to
+      // everything currently on the classpath.
+      this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
+          tableClassName);
+    }
+  }
+
+  /**
+   * If any classloader was invoked by loadJars, free it here.
+   */
+  protected void unloadJars() {
+    if (null != this.prevClassLoader) {
+      // unload the special classloader for this jar.
+      ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
+    }
+  }
+
+  /**
+   * Configure the inputformat to use for the job.
+   */
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+    //TODO: 'splitByCol' is import-job specific; lift it out of this API.
+    Class<? extends InputFormat> ifClass = getInputFormatClass();
+    LOG.debug("Using InputFormat: " + ifClass);
+    job.setInputFormatClass(ifClass);
+  }
+
+  /**
+   * Configure the output format to use for the job.
+   */
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+    Class<? extends OutputFormat> ofClass = getOutputFormatClass();
+    LOG.debug("Using OutputFormat: " + ofClass);
+    job.setOutputFormatClass(ofClass);
+  }
+
+  /**
+   * Set the mapper class implementation to use in the job,
+   * as well as any related configuration (e.g., map output types).
+   */
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+    job.setMapperClass(getMapperClass());
+  }
+
+  /**
+   * Configure the number of map/reduce tasks to use in the job.
+   */
+  protected int configureNumTasks(Job job) throws IOException {
+    int numMapTasks = options.getNumMappers();
+    if (numMapTasks < 1) {
+      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+    }
+
+    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+    job.setNumReduceTasks(0);
+    return numMapTasks;
+  }
+
+  /** Set the main job that will be run. */
+  protected void setJob(Job job) {
+    mrJob = job;
+  }
+
+  /**
+   * @return the main MapReduce job that is being run, or null if no
+   * job has started.
+   */
+  public Job getJob() {
+    return mrJob;
+  }
+
+  /**
+   * Actually run the MapReduce job.
+   */
+  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+      InterruptedException {
+    return job.waitForCompletion(true);
+  }
+
+  /**
+   * Display a notice on the log that the current MapReduce job has
+   * been retired, and thus Counters are unavailable.
+   * @param log the Log to display the info to.
+   */
+  protected void displayRetiredJobNotice(Log log) {
+    log.info("The MapReduce job has already been retired. Performance");
+    log.info("counters are unavailable. To get this information, ");
+    log.info("you will need to enable the completed job store on ");
+    log.info("the jobtracker with:");
+    log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
+    log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
+    log.info("A jobtracker restart is required for these settings");
+    log.info("to take effect.");
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.util.Jars;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.JobBase;
+
+/**
+ * Run a MapReduce job that merges two datasets.
+ */
+public class MergeJob extends JobBase {
+
+  /** Configuration key specifying the path to the "old" dataset. */
+  public static final String MERGE_OLD_PATH_KEY = "sqoop.merge.old.path";
+
+  /** Configuration key specifying the path to the "new" dataset. */
+  public static final String MERGE_NEW_PATH_KEY = "sqoop.merge.new.path";
+
+  /** Configuration key specifying the name of the key column for joins. */
+  public static final String MERGE_KEY_COL_KEY = "sqoop.merge.key.col";
+
+  /** Configuration key specifying the SqoopRecord class name for
+   * the records we are merging.
+   */
+  public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
+
+  public MergeJob(final SqoopOptions opts) {
+    super(opts, null, null, null);
+  }
+
+  public boolean runMergeJob() throws IOException {
+    Configuration conf = options.getConf();
+    Job job = new Job(conf);
+
+    String userClassName = options.getClassName();
+    if (null == userClassName) {
+      // Shouldn't get here.
+      throw new IOException("Record class name not specified with "
+          + "--class-name.");
+    }
+
+    // Set the external jar to use for the job.
+    String existingJar = options.getExistingJarName();
+    if (existingJar != null) {
+      // User explicitly identified a jar path.
+      LOG.debug("Setting job jar to user-specified jar: " + existingJar);
+      job.getConfiguration().set("mapred.jar", existingJar);
+    } else {
+      // Infer it from the location of the specified class, if it's on the
+      // classpath.
+      try {
+        Class<? extends Object> userClass = conf.getClassByName(userClassName);
+        if (null != userClass) {
+          String userJar = Jars.getJarPathForClass(userClass);
+          LOG.debug("Setting job jar based on user class " + userClassName
+              + ": " + userJar);
+          job.getConfiguration().set("mapred.jar", userJar);
+        } else {
+          LOG.warn("Specified class " + userClassName + " is not in a jar. "
+              + "MapReduce may not find the class");
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    }
+
+    try {
+      Path oldPath = new Path(options.getMergeOldPath());
+      Path newPath = new Path(options.getMergeNewPath());
+
+      Configuration jobConf = job.getConfiguration();
+      FileSystem fs = FileSystem.get(jobConf);
+      oldPath = oldPath.makeQualified(fs);
+      newPath = newPath.makeQualified(fs);
+
+      FileInputFormat.addInputPath(job, oldPath);
+      FileInputFormat.addInputPath(job, newPath);
+
+      jobConf.set(MERGE_OLD_PATH_KEY, oldPath.toString());
+      jobConf.set(MERGE_NEW_PATH_KEY, newPath.toString());
+      jobConf.set(MERGE_KEY_COL_KEY, options.getMergeKeyCol());
+      jobConf.set(MERGE_SQOOP_RECORD_KEY, userClassName);
+
+      FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
+
+      if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
+        job.setInputFormatClass(SequenceFileInputFormat.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setMapperClass(MergeRecordMapper.class);
+      } else {
+        job.setMapperClass(MergeTextMapper.class);
+        job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+      }
+
+      jobConf.set("mapred.output.key.class", userClassName);
+      job.setOutputValueClass(NullWritable.class);
+
+      job.setReducerClass(MergeReducer.class);
+
+      // Set the intermediate data types.
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(MergeRecord.class);
+
+      // Make sure Sqoop and anything else we need is on the classpath.
+      cacheJars(job, null);
+      setJob(job);
+      return this.runJob(job);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+}
+
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Given a set of SqoopRecord instances which are from a "new" dataset
+ * or an "old" dataset, extract a key column from the record and tag
+ * each record with a bit specifying whether it is a new or old record.
+ */
+public class MergeMapperBase<INKEY, INVAL>
+    extends Mapper<INKEY, INVAL, Text, MergeRecord> {
+
+  public static final Log LOG = LogFactory.getLog(
+      MergeMapperBase.class.getName());
+
+  private String keyColName; // name of the key column.
+  private boolean isNew; // true if this split is from the new dataset.
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
+
+    InputSplit is = context.getInputSplit();
+    FileSplit fs = (FileSplit) is;
+    Path splitPath = fs.getPath();
+
+    if (splitPath.toString().startsWith(
+        conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
+      this.isNew = true;
+    } else if (splitPath.toString().startsWith(
+        conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
+      this.isNew = false;
+    } else {
+      throw new IOException("File " + splitPath + " is not under new path "
+          + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
+          + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
+    }
+  }
+
+  protected void processRecord(SqoopRecord r, Context c)
+      throws IOException, InterruptedException {
+    MergeRecord mr = new MergeRecord(r, isNew);
+    Map<String, Object> fieldMap = r.getFieldMap();
+    if (null == fieldMap) {
+      throw new IOException("No field map in record " + r);
+    }
+    Object keyObj = fieldMap.get(keyColName);
+    if (null == keyObj) {
+      throw new IOException("Cannot join values on null key. "
+          + "Did you specify a key column that exists?");
+    } else {
+      c.write(new Text(keyObj.toString()), mr);
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Class that holds a record to be merged. This contains a SqoopRecord which
+ * is the "guts" of the item, and a boolean value indicating whether it is a
+ * "new" record or an "old" record. In the Reducer, we prefer to emit a new
+ * record rather than an old one, if a new one is available.
+ */
+public class MergeRecord implements Configurable, Writable {
+  private SqoopRecord sqoopRecord;
+  private boolean isNew;
+  private Configuration config;
+
+  /** Construct an empty MergeRecord. */
+  public MergeRecord() {
+    this.sqoopRecord = null;
+    this.isNew = false;
+    this.config = new Configuration();
+  }
+
+  /**
+   * Construct a MergeRecord with all fields initialized.
+   */
+  public MergeRecord(SqoopRecord sr, boolean recordIsNew) {
+    this.sqoopRecord = sr;
+    this.isNew = recordIsNew;
+    this.config = new Configuration();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void setConf(Configuration conf) {
+    this.config = conf;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public Configuration getConf() {
+    return this.config;
+  }
+
+  /** @return true if this record came from the "new" dataset. */
+  public boolean isNewRecord() {
+    return isNew;
+  }
+
+  /**
+   * Set the isNew field to 'newVal'.
+   */
+  public void setNewRecord(boolean newVal) {
+    this.isNew = newVal;
+  }
+
+  /**
+   * @return the underlying SqoopRecord we're shipping.
+   */
+  public SqoopRecord getSqoopRecord() {
+    return this.sqoopRecord;
+  }
+
+  /**
+   * Set the SqoopRecord instance we should pass from the mapper to the
+   * reducer.
+   */
+  public void setSqoopRecord(SqoopRecord record) {
+    this.sqoopRecord = record;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void readFields(DataInput in) throws IOException {
+    this.isNew = in.readBoolean();
+    String className = Text.readString(in);
+    if (null == this.sqoopRecord) {
+      // If we haven't already instantiated an inner SqoopRecord, do so here.
+      try {
+        Class<? extends SqoopRecord> recordClass =
+            (Class<? extends SqoopRecord>) config.getClassByName(className);
+        this.sqoopRecord = recordClass.newInstance();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    this.sqoopRecord.readFields(in);
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(this.isNew);
+    Text.writeString(out, this.sqoopRecord.getClass().getName());
+    this.sqoopRecord.write(out);
+  }
+
+  @Override
+  public String toString() {
+    return "" + this.sqoopRecord;
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MergeMapperBase;
+
+/**
+ * Mapper for the merge program which operates on SequenceFiles.
+ */
+public class MergeRecordMapper
+    extends MergeMapperBase<LongWritable, SqoopRecord> {
+
+  public void map(LongWritable key, SqoopRecord val, Context c)
+      throws IOException, InterruptedException {
+    processRecord(val, c);
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Reducer for merge tool. Given records tagged as 'old' or 'new', emit
+ * a new one if possible; otherwise, an old one.
+ */
+public class MergeReducer
+    extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
+
+  @Override
+  public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+      throws IOException, InterruptedException {
+    SqoopRecord bestRecord = null;
+    try {
+      for (MergeRecord val : vals) {
+        if (null == bestRecord && !val.isNewRecord()) {
+          // Use an old record if we don't have a new record.
+          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+        } else if (val.isNewRecord()) {
+          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+        }
+      }
+    } catch (CloneNotSupportedException cnse) {
+      throw new IOException(cnse);
+    }
+
+    if (null != bestRecord) {
+      c.write(bestRecord, NullWritable.get());
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MergeMapperBase;
+
+/**
+ * Mapper for the merge program which operates on text files that we need to
+ * parse into SqoopRecord instances.
+ */
+public class MergeTextMapper extends MergeMapperBase<LongWritable, Text> {
+
+  private SqoopRecord record;
+
+  @Override
+  protected void setup(Context c) throws IOException, InterruptedException {
+    Configuration conf = c.getConfiguration();
+
+    Class<? extends SqoopRecord> recordClass =
+        (Class<? extends SqoopRecord>) conf.getClass(
+        MergeJob.MERGE_SQOOP_RECORD_KEY, SqoopRecord.class);
+    this.record = ReflectionUtils.newInstance(recordClass, conf);
+
+    super.setup(c);
+  }
+
+  public void map(LongWritable key, Text val, Context c)
+      throws IOException, InterruptedException {
+    try {
+      this.record.parse(val);
+    } catch (RecordParser.ParseError pe) {
+      throw new IOException(pe);
+    }
+
+    processRecord(this.record, c);
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an import job using mysqldump in the mapper.
+ */
+public class MySQLDumpImportJob extends ImportJobBase {
+
+  public static final Log LOG =
+      LogFactory.getLog(MySQLDumpImportJob.class.getName());
+
+  public MySQLDumpImportJob(final SqoopOptions opts, ImportJobContext context)
+      throws ClassNotFoundException {
+    super(opts, MySQLDumpMapper.class, MySQLDumpInputFormat.class,
+        RawKeyTextOutputFormat.class, context);
+  }
+
+  /**
+   * Configure the inputformat to use for the job.
+   */
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+
+    if (null == tableName) {
+        LOG.error(
+            "mysqldump-based import cannot support free-form query imports.");
+        LOG.error("Do not use --direct and --query together for MySQL.");
+        throw new IOException("null tableName for MySQLDumpImportJob.");
+    }
+
+    ConnManager mgr = getContext().getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(),
+          mgr.getDriverClass(), options.getConnectString());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(),
+          mgr.getDriverClass(), options.getConnectString(), username,
+          options.getPassword());
+    }
+
+    String [] colNames = options.getColumns();
+    if (null == colNames) {
+      colNames = mgr.getColumnNames(tableName);
+    }
+
+    String [] sqlColNames = null;
+    if (null != colNames) {
+      sqlColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; i++) {
+        sqlColNames[i] = mgr.escapeColName(colNames[i]);
+      }
+    }
+
+    // It's ok if the where clause is null in DBInputFormat.setInput.
+    String whereClause = options.getWhereClause();
+
+    // We can't set the class properly in here, because we may not have the
+    // jar loaded in this JVM. So we start by calling setInput() with
+    // DBWritable and then overriding the string manually.
+
+    // Note that mysqldump also does *not* want a quoted table name.
+    DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+        tableName, whereClause,
+        mgr.escapeColName(splitByCol), sqlColNames);
+
+    Configuration conf = job.getConfiguration();
+    conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+        options.getOutputFieldDelim());
+    conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+        options.getOutputRecordDelim());
+    conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+        options.getOutputEnclosedBy());
+    conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+        options.getOutputEscapedBy());
+    conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
+        options.isOutputEncloseRequired());
+    String [] extraArgs = options.getExtraArgs();
+    if (null != extraArgs) {
+      conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
+    }
+
+    LOG.debug("Using InputFormat: " + inputFormatClass);
+    job.setInputFormatClass(getInputFormatClass());
+  }
+
+  /**
+   * Set the mapper class implementation to use in the job,
+   * as well as any related configuration (e.g., map output types).
+   */
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+    job.setMapperClass(getMapperClass());
+    job.setOutputKeyClass(String.class);
+    job.setOutputValueClass(NullWritable.class);
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * InputFormat designed to take data-driven splits and feed them to a mysqldump
+ * invocation running in the mapper.
+ *
+ * The key emitted by this mapper is a WHERE clause to use in the command
+ * to mysqldump.
+ */
+public class MySQLDumpInputFormat extends DataDrivenDBInputFormat {
+
+  public static final Log LOG = LogFactory.getLog(
+      MySQLDumpInputFormat.class.getName());
+
+  /**
+   * A RecordReader that just takes the WHERE conditions from the DBInputSplit
+   * and relates them to the mapper as a single input record.
+   */
+  public static class MySQLDumpRecordReader
+      extends RecordReader<String, NullWritable> {
+
+    private boolean delivered;
+    private String clause;
+
+    public MySQLDumpRecordReader(InputSplit split) {
+      initialize(split, null);
+    }
+
+    @Override
+    public boolean nextKeyValue() {
+      boolean hasNext = !delivered;
+      delivered = true;
+      return hasNext;
+    }
+
+    @Override
+    public String getCurrentKey() {
+      return clause;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public float getProgress() {
+      return delivered ? 1.0f : 0.0f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
+          (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;
+
+      this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
+          + dbSplit.getUpperClause() + ")";
+    }
+  }
+
+  public RecordReader<String, NullWritable> createRecordReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new MySQLDumpRecordReader(split);
+  }
+
+}
+