You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC
svn commit: r1190489 [5/6] - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.hbase.HBasePutProcessor;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Runs an HBase import via DataDrivenDBInputFormat to the HBasePutProcessor
+ * in the DelegatingOutputFormat.
+ */
+public class HBaseImportJob extends DataDrivenImportJob {
+
+ public static final Log LOG = LogFactory.getLog(
+ HBaseImportJob.class.getName());
+
+ public HBaseImportJob(final SqoopOptions opts,
+ final ImportJobContext importContext) {
+ super(opts, importContext.getInputFormat(), importContext);
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws IOException {
+ job.setOutputKeyClass(SqoopRecord.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setMapperClass(getMapperClass());
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ return HBaseImportMapper.class;
+ }
+
+ @Override
+ protected Class<? extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return DelegatingOutputFormat.class;
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+
+ // Use the DelegatingOutputFormat with the HBasePutProcessor.
+ job.setOutputFormatClass(getOutputFormatClass());
+
+ Configuration conf = job.getConfiguration();
+ conf.setClass("sqoop.output.delegate.field.map.processor.class",
+ HBasePutProcessor.class,
+ FieldMapProcessor.class);
+
+ // Set the HBase parameters (table, column family, row key):
+ conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
+ conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
+
+ // What column of the input becomes the row key?
+ String rowKeyCol = options.getHBaseRowKeyColumn();
+ if (null == rowKeyCol) {
+ // User didn't explicitly set one. If there's a split-by column set,
+ // use that.
+ rowKeyCol = options.getSplitByCol();
+ }
+
+ if (null == rowKeyCol) {
+ // No split-by column is explicitly set.
+ // If the table has a primary key, use that.
+ ConnManager manager = getContext().getConnManager();
+ rowKeyCol = manager.getPrimaryKey(tableName);
+ }
+
+ if (null == rowKeyCol) {
+ // Give up here if this is still unset.
+ throw new IOException("Could not determine the row-key column. "
+ + "Use --hbase-row-key to specify the input column that "
+ + "names each row.");
+ }
+
+ conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol);
+ }
+
+ @Override
+ /** Create the target HBase table before running the job. */
+ protected void jobSetup(Job job) throws IOException, ImportException {
+ Configuration conf = job.getConfiguration();
+ String tableName = conf.get(HBasePutProcessor.TABLE_NAME_KEY);
+ String familyName = conf.get(HBasePutProcessor.COL_FAMILY_KEY);
+
+ if (null == tableName) {
+ throw new ImportException(
+ "Import to HBase error: Table name not specified");
+ }
+
+ if (null == familyName) {
+ throw new ImportException(
+ "Import to HBase error: Column family not specified");
+ }
+
+ // Add HBase configuration files to this conf object.
+ HBaseConfiguration.addHbaseResources(conf);
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ // Check to see if the table exists.
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ byte [] familyBytes = Bytes.toBytes(familyName);
+ HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
+ if (!admin.tableExists(tableName)) {
+ if (options.getCreateHBaseTable()) {
+ // Create the table.
+ LOG.info("Creating missing HBase table " + tableName);
+ tableDesc.addFamily(colDesc);
+ admin.createTable(tableDesc);
+ } else {
+ LOG.warn("Could not find HBase table " + tableName);
+ LOG.warn("This job may fail. Either explicitly create the table,");
+ LOG.warn("or re-run with --hbase-create-table.");
+ }
+ } else if (!tableDesc.hasFamily(familyBytes)) {
+ if (options.getCreateHBaseTable()) {
+ // Create the column family.
+ LOG.info("Creating missing column family " + familyName);
+ admin.disableTable(tableName);
+ admin.addColumn(tableName, colDesc);
+ admin.enableTable(tableName);
+ } else {
+ LOG.warn("Could not find column family " + familyName + " in table "
+ + tableName);
+ LOG.warn("This job may fail. Either create the column family,");
+ LOG.warn("or re-run with --hbase-create-table.");
+ }
+ }
+
+ // Make sure HBase libraries are shipped as part of the job.
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJars(conf, HTable.class);
+
+ super.jobSetup(job);
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by writing them to HBase via the DelegatingOutputFormat
+ * and the HBasePutProcessor.
+ */
+public class HBaseImportMapper
+ extends AutoProgressMapper
+ <LongWritable, SqoopRecord, SqoopRecord, NullWritable> {
+
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ context.write(val, NullWritable.get());
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.io.CodecMap;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Base class for running an import MapReduce job.
+ * Allows dependency injection, etc, for easy customization of import job types.
+ */
+public class ImportJobBase extends JobBase {
+
+ private ImportJobContext context;
+
+ public static final Log LOG = LogFactory.getLog(
+ ImportJobBase.class.getName());
+
+ public ImportJobBase() {
+ this(null);
+ }
+
+ public ImportJobBase(final SqoopOptions opts) {
+ this(opts, null, null, null, null);
+ }
+
+ public ImportJobBase(final SqoopOptions opts,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass,
+ final ImportJobContext context) {
+ super(opts, mapperClass, inputFormatClass, outputFormatClass);
+ this.context = context;
+ }
+
+ /**
+ * Configure the output format to use for the job.
+ */
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+
+ job.setOutputFormatClass(getOutputFormatClass());
+
+ if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+ job.getConfiguration().set("mapred.output.value.class", tableClassName);
+ }
+
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+
+ String codecName = options.getCompressionCodec();
+ Class<? extends CompressionCodec> codecClass;
+ if (codecName == null) {
+ codecClass = GzipCodec.class;
+ } else {
+ Configuration conf = job.getConfiguration();
+ codecClass = CodecMap.getCodec(codecName, conf).getClass();
+ }
+ FileOutputFormat.setOutputCompressorClass(job, codecClass);
+
+ if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+ SequenceFileOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+ }
+ }
+
+ Path outputPath = context.getDestination();
+ FileOutputFormat.setOutputPath(job, outputPath);
+ }
+
+ /**
+ * Actually run the MapReduce job.
+ */
+ @Override
+ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+ InterruptedException {
+
+ PerfCounters perfCounters = new PerfCounters();
+ perfCounters.startClock();
+
+ boolean success = job.waitForCompletion(true);
+ perfCounters.stopClock();
+
+ Counters jobCounters = job.getCounters();
+ // If the job has been retired, these may be unavailable.
+ if (null == jobCounters) {
+ displayRetiredJobNotice(LOG);
+ } else {
+ perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
+ .findCounter("HDFS_BYTES_WRITTEN").getValue());
+ LOG.info("Transferred " + perfCounters.toString());
+ long numRecords = ConfigurationHelper.getNumMapOutputRecords(job);
+ LOG.info("Retrieved " + numRecords + " records.");
+ }
+ return success;
+ }
+
+
+ /**
+ * Run an import job to read a table in to HDFS.
+ *
+ * @param tableName the database table to read; may be null if a free-form
+ * query is specified in the SqoopOptions, and the ImportJobBase subclass
+ * supports free-form queries.
+ * @param ormJarFile the Jar file to insert into the dcache classpath.
+ * (may be null)
+ * @param splitByCol the column of the database table to use to split
+ * the import
+ * @param conf A fresh Hadoop Configuration to use to build an MR job.
+ * @throws IOException if the job encountered an IO problem
+ * @throws ImportException if the job failed unexpectedly or was
+ * misconfigured.
+ */
+ public void runImport(String tableName, String ormJarFile, String splitByCol,
+ Configuration conf) throws IOException, ImportException {
+
+ if (null != tableName) {
+ LOG.info("Beginning import of " + tableName);
+ } else {
+ LOG.info("Beginning query import.");
+ }
+
+ String tableClassName =
+ new TableClassName(options).getClassForTable(tableName);
+ loadJars(conf, ormJarFile, tableClassName);
+
+ try {
+ Job job = new Job(conf);
+
+ // Set the external jar to use for the job.
+ job.getConfiguration().set("mapred.jar", ormJarFile);
+
+ configureInputFormat(job, tableName, tableClassName, splitByCol);
+ configureOutputFormat(job, tableName, tableClassName);
+ configureMapper(job, tableName, tableClassName);
+ configureNumTasks(job);
+ cacheJars(job, getContext().getConnManager());
+
+ jobSetup(job);
+ setJob(job);
+ boolean success = runJob(job);
+ if (!success) {
+ throw new ImportException("Import job failed!");
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ unloadJars();
+ }
+ }
+
+ /**
+ * Open-ended "setup" routine that is called after the job is configured
+ * but just before it is submitted to MapReduce. Subclasses may override
+ * if necessary.
+ */
+ protected void jobSetup(Job job) throws IOException, ImportException {
+ }
+
+ protected ImportJobContext getContext() {
+ return context;
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an export using JDBC (JDBC-based ExportOutputFormat).
+ */
+public class JdbcExportJob extends ExportJobBase {
+
+ private FileType fileType;
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcExportJob.class.getName());
+
+ public JdbcExportJob(final ExportJobContext context) {
+ super(context);
+ }
+
+ public JdbcExportJob(final ExportJobContext ctxt,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
+ @Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+
+ fileType = getInputFileType();
+
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+ if (fileType == FileType.AVRO_DATA_FILE) {
+ LOG.debug("Configuring for Avro export");
+ ConnManager connManager = context.getConnManager();
+ Map<String, Integer> columnTypeInts =
+ connManager.getColumnTypes(tableName, options.getSqlQuery());
+ MapWritable columnTypes = new MapWritable();
+ for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+ Text columnName = new Text(e.getKey());
+ Text columnText = new Text(connManager.toJavaType(e.getValue()));
+ columnTypes.put(columnName, columnText);
+ }
+ DefaultStringifier.store(job.getConfiguration(), columnTypes,
+ AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+ }
+
+ }
+
+ @Override
+ protected Class<? extends InputFormat> getInputFormatClass()
+ throws ClassNotFoundException {
+ if (fileType == FileType.AVRO_DATA_FILE) {
+ return AvroInputFormat.class;
+ }
+ return super.getInputFormatClass();
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ switch (fileType) {
+ case SEQUENCE_FILE:
+ return SequenceFileExportMapper.class;
+ case AVRO_DATA_FILE:
+ return AvroExportMapper.class;
+ case UNKNOWN:
+ default:
+ return TextExportMapper.class;
+ }
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws IOException {
+
+ ConnManager mgr = context.getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+ DBOutputFormat.setOutput(job, tableName, colNames);
+
+ job.setOutputFormatClass(getOutputFormatClass());
+ job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
+ }
+ }
+
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
+ */
+public class JdbcUpdateExportJob extends ExportJobBase {
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcUpdateExportJob.class.getName());
+
+ /**
+ * Return an instance of the UpdateOutputFormat class object loaded
+ * from the shim jar.
+ */
+ private static Class<? extends OutputFormat> getUpdateOutputFormat()
+ throws IOException {
+ return UpdateOutputFormat.class;
+ }
+
+ public JdbcUpdateExportJob(final ExportJobContext context)
+ throws IOException {
+ super(context, null, null, getUpdateOutputFormat());
+ }
+
+ public JdbcUpdateExportJob(final ExportJobContext ctxt,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ if (inputIsSequenceFiles()) {
+ return SequenceFileExportMapper.class;
+ } else {
+ return TextExportMapper.class;
+ }
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws IOException {
+
+ ConnManager mgr = context.getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ if (null == colNames) {
+ throw new IOException(
+ "Export column names could not be determined for " + tableName);
+ }
+
+ String updateKeyColumns = options.getUpdateKeyCol();
+ if (null == updateKeyColumns) {
+ throw new IOException("Update key column not set in export job");
+ }
+ // Update key columns lookup and removal
+ Set<String> updateKeys = new LinkedHashSet<String>();
+ Set<String> updateKeysUppercase = new HashSet<String>();
+ StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+ while (stok.hasMoreTokens()) {
+ String nextUpdateKey = stok.nextToken().trim();
+ if (nextUpdateKey.length() > 0) {
+ updateKeys.add(nextUpdateKey);
+ updateKeysUppercase.add(nextUpdateKey.toUpperCase());
+ } else {
+ throw new RuntimeException("Invalid update key column value specified"
+ + ": '" + updateKeyColumns + "'");
+ }
+ }
+
+ if (updateKeys.size() == 0) {
+ throw new IOException("Unpdate key columns not valid in export job");
+ }
+
+ // Make sure we strip out the key column from this list.
+ String [] outColNames = new String[colNames.length - updateKeys.size()];
+ int j = 0;
+ for (int i = 0; i < colNames.length; i++) {
+ if (!updateKeysUppercase.contains(colNames[i].toUpperCase())) {
+ outColNames[j++] = colNames[i];
+ }
+ }
+ DBOutputFormat.setOutput(job, tableName, outColNames);
+
+ job.setOutputFormatClass(getOutputFormatClass());
+ job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ */
+public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcUpsertExportJob.class.getName());
+
+ public JdbcUpsertExportJob(final ExportJobContext context,
+ final Class<? extends OutputFormat> outputFormatClass)
+ throws IOException {
+ super(context, null, null, outputFormatClass);
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws IOException {
+
+ ConnManager mgr = context.getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+ if (null == colNames) {
+ throw new IOException(
+ "Export column names could not be determined for " + tableName);
+ }
+ DBOutputFormat.setOutput(job, tableName, colNames);
+
+ String updateKeyColumns = options.getUpdateKeyCol();
+ if (null == updateKeyColumns) {
+ throw new IOException("Update key column not set in export job");
+ }
+ // Update key columns lookup and removal
+ Set<String> updateKeys = new LinkedHashSet<String>();
+ StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+ while (stok.hasMoreTokens()) {
+ String nextUpdateKey = stok.nextToken().trim();
+ if (nextUpdateKey.length() > 0) {
+ updateKeys.add(nextUpdateKey);
+ } else {
+ throw new RuntimeException("Invalid update key column value specified"
+ + ": '" + updateKeyColumns + "'");
+ }
+ }
+
+ if (updateKeys.size() == 0) {
+ throw new IOException("Unpdate key columns not valid in export job");
+ }
+
+ job.setOutputFormatClass(getOutputFormatClass());
+ job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.tool.SqoopTool;
+import com.cloudera.sqoop.util.ClassLoaderStack;
+import com.cloudera.sqoop.util.Jars;
+
+/**
+ * Base class for configuring and running a MapReduce job.
+ * Allows dependency injection, etc, for easy customization of import job types.
+ */
+public class JobBase {
+
+ public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
+
+ protected SqoopOptions options;
+ protected Class<? extends Mapper> mapperClass;
+ protected Class<? extends InputFormat> inputFormatClass;
+ protected Class<? extends OutputFormat> outputFormatClass;
+
+ private Job mrJob;
+
+ private ClassLoader prevClassLoader = null;
+
+ public JobBase() {
+ this(null);
+ }
+
+ public JobBase(final SqoopOptions opts) {
+ this(opts, null, null, null);
+ }
+
+ public JobBase(final SqoopOptions opts,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+
+ this.options = opts;
+ this.mapperClass = mapperClass;
+ this.inputFormatClass = inputFormatClass;
+ this.outputFormatClass = outputFormatClass;
+ }
+
+ /**
+ * @return the mapper class to use for the job.
+ */
+ protected Class<? extends Mapper> getMapperClass()
+ throws ClassNotFoundException {
+ return this.mapperClass;
+ }
+
+ /**
+ * @return the inputformat class to use for the job.
+ */
+ protected Class<? extends InputFormat> getInputFormatClass()
+ throws ClassNotFoundException {
+ return this.inputFormatClass;
+ }
+
+ /**
+ * @return the outputformat class to use for the job.
+ */
+ protected Class<? extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return this.outputFormatClass;
+ }
+
+ /** Set the OutputFormat class to use for this job. */
+ public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
+ this.outputFormatClass = cls;
+ }
+
+ /** Set the InputFormat class to use for this job. */
+ public void setInputFormatClass(Class<? extends InputFormat> cls) {
+ this.inputFormatClass = cls;
+ }
+
+ /** Set the Mapper class to use for this job. */
+ public void setMapperClass(Class<? extends Mapper> cls) {
+ this.mapperClass = cls;
+ }
+
+ /**
+ * Set the SqoopOptions configuring this job.
+ */
+ public void setOptions(SqoopOptions opts) {
+ this.options = opts;
+ }
+
+ /**
+ * Put jar files required by Sqoop into the DistributedCache.
+ * @param job the Job being submitted.
+ * @param mgr the ConnManager to use.
+ */
+ protected void cacheJars(Job job, ConnManager mgr)
+ throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Set<String> localUrls = new HashSet<String>();
+
+ addToCache(Jars.getSqoopJarPath(), fs, localUrls);
+ if (null != mgr) {
+ addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
+ addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
+ }
+
+ SqoopTool tool = this.options.getActiveSqoopTool();
+ if (null != tool) {
+ // Make sure the jar for the tool itself is on the classpath. (In case
+ // this is a third-party plugin tool.)
+ addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls);
+ List<String> toolDeps = tool.getDependencyJars();
+ if (null != toolDeps) {
+ for (String depFile : toolDeps) {
+ addToCache(depFile, fs, localUrls);
+ }
+ }
+ }
+
+ // If the user specified a particular jar file name,
+
+ // Add anything in $SQOOP_HOME/lib, if this is set.
+ String sqoopHome = System.getenv("SQOOP_HOME");
+ if (null != sqoopHome) {
+ File sqoopHomeFile = new File(sqoopHome);
+ File sqoopLibFile = new File(sqoopHomeFile, "lib");
+ if (sqoopLibFile.exists()) {
+ addDirToCache(sqoopLibFile, fs, localUrls);
+ }
+ } else {
+ LOG.warn("SQOOP_HOME is unset. May not be able to find "
+ + "all job dependencies.");
+ }
+
+ // If we didn't put anything in our set, then there's nothing to cache.
+ if (localUrls.isEmpty()) {
+ return;
+ }
+
+ // Add these to the 'tmpjars' array, which the MR JobSubmitter
+ // will upload to HDFS and put in the DistributedCache libjars.
+ String tmpjars = conf.get("tmpjars");
+ StringBuilder sb = new StringBuilder();
+ if (null != tmpjars) {
+ sb.append(tmpjars);
+ sb.append(",");
+ }
+ sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
+ conf.set("tmpjars", sb.toString());
+ }
+
+ private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
+ if (null == file) {
+ return;
+ }
+
+ Path p = new Path(file);
+ String qualified = p.makeQualified(fs).toString();
+ LOG.debug("Adding to job classpath: " + qualified);
+ localUrls.add(qualified);
+ }
+
+ /**
+ * Add the .jar elements of a directory to the DCache classpath,
+ * nonrecursively.
+ */
+ private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
+ if (null == dir) {
+ return;
+ }
+
+ for (File libfile : dir.listFiles()) {
+ if (libfile.exists() && !libfile.isDirectory()
+ && libfile.getName().endsWith("jar")) {
+ addToCache(libfile.toString(), fs, localUrls);
+ }
+ }
+ }
+
+ /**
+ * If jars must be loaded into the local environment, do so here.
+ */
+ protected void loadJars(Configuration conf, String ormJarFile,
+ String tableClassName) throws IOException {
+ boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
+ || "local".equals(conf.get("mapred.job.tracker"));
+ if (isLocal) {
+ // If we're using the LocalJobRunner, then instead of using the compiled
+ // jar file as the job source, we're running in the current thread. Push
+ // on another classloader that loads from that jar in addition to
+ // everything currently on the classpath.
+ this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
+ tableClassName);
+ }
+ }
+
+ /**
+ * If any classloader was invoked by loadJars, free it here.
+ */
+ protected void unloadJars() {
+ if (null != this.prevClassLoader) {
+ // unload the special classloader for this jar.
+ ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
+ }
+ }
+
+ /**
+ * Configure the inputformat to use for the job.
+ */
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+ //TODO: 'splitByCol' is import-job specific; lift it out of this API.
+ Class<? extends InputFormat> ifClass = getInputFormatClass();
+ LOG.debug("Using InputFormat: " + ifClass);
+ job.setInputFormatClass(ifClass);
+ }
+
+ /**
+ * Configure the output format to use for the job.
+ */
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+ Class<? extends OutputFormat> ofClass = getOutputFormatClass();
+ LOG.debug("Using OutputFormat: " + ofClass);
+ job.setOutputFormatClass(ofClass);
+ }
+
+ /**
+ * Set the mapper class implementation to use in the job,
+ * as well as any related configuration (e.g., map output types).
+ */
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+ job.setMapperClass(getMapperClass());
+ }
+
+ /**
+ * Configure the number of map/reduce tasks to use in the job.
+ */
+ protected int configureNumTasks(Job job) throws IOException {
+ int numMapTasks = options.getNumMappers();
+ if (numMapTasks < 1) {
+ numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+ LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+ }
+
+ ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+ job.setNumReduceTasks(0);
+ return numMapTasks;
+ }
+
+ /** Set the main job that will be run. */
+ protected void setJob(Job job) {
+ mrJob = job;
+ }
+
+ /**
+ * @return the main MapReduce job that is being run, or null if no
+ * job has started.
+ */
+ public Job getJob() {
+ return mrJob;
+ }
+
+ /**
+ * Actually run the MapReduce job.
+ */
+ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+ InterruptedException {
+ return job.waitForCompletion(true);
+ }
+
+ /**
+ * Display a notice on the log that the current MapReduce job has
+ * been retired, and thus Counters are unavailable.
+ * @param log the Log to display the info to.
+ */
+ protected void displayRetiredJobNotice(Log log) {
+ log.info("The MapReduce job has already been retired. Performance");
+ log.info("counters are unavailable. To get this information, ");
+ log.info("you will need to enable the completed job store on ");
+ log.info("the jobtracker with:");
+ log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
+ log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
+ log.info("A jobtracker restart is required for these settings");
+ log.info("to take effect.");
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.util.Jars;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.JobBase;
+
+/**
+ * Run a MapReduce job that merges two datasets.
+ */
+public class MergeJob extends JobBase {
+
+ /** Configuration key specifying the path to the "old" dataset. */
+ public static final String MERGE_OLD_PATH_KEY = "sqoop.merge.old.path";
+
+ /** Configuration key specifying the path to the "new" dataset. */
+ public static final String MERGE_NEW_PATH_KEY = "sqoop.merge.new.path";
+
+ /** Configuration key specifying the name of the key column for joins. */
+ public static final String MERGE_KEY_COL_KEY = "sqoop.merge.key.col";
+
+ /** Configuration key specifying the SqoopRecord class name for
+ * the records we are merging.
+ */
+ public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
+
+ public MergeJob(final SqoopOptions opts) {
+ super(opts, null, null, null);
+ }
+
+ public boolean runMergeJob() throws IOException {
+ Configuration conf = options.getConf();
+ Job job = new Job(conf);
+
+ String userClassName = options.getClassName();
+ if (null == userClassName) {
+ // Shouldn't get here.
+ throw new IOException("Record class name not specified with "
+ + "--class-name.");
+ }
+
+ // Set the external jar to use for the job.
+ String existingJar = options.getExistingJarName();
+ if (existingJar != null) {
+ // User explicitly identified a jar path.
+ LOG.debug("Setting job jar to user-specified jar: " + existingJar);
+ job.getConfiguration().set("mapred.jar", existingJar);
+ } else {
+ // Infer it from the location of the specified class, if it's on the
+ // classpath.
+ try {
+ Class<? extends Object> userClass = conf.getClassByName(userClassName);
+ if (null != userClass) {
+ String userJar = Jars.getJarPathForClass(userClass);
+ LOG.debug("Setting job jar based on user class " + userClassName
+ + ": " + userJar);
+ job.getConfiguration().set("mapred.jar", userJar);
+ } else {
+ LOG.warn("Specified class " + userClassName + " is not in a jar. "
+ + "MapReduce may not find the class");
+ }
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+
+ try {
+ Path oldPath = new Path(options.getMergeOldPath());
+ Path newPath = new Path(options.getMergeNewPath());
+
+ Configuration jobConf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(jobConf);
+ oldPath = oldPath.makeQualified(fs);
+ newPath = newPath.makeQualified(fs);
+
+ FileInputFormat.addInputPath(job, oldPath);
+ FileInputFormat.addInputPath(job, newPath);
+
+ jobConf.set(MERGE_OLD_PATH_KEY, oldPath.toString());
+ jobConf.set(MERGE_NEW_PATH_KEY, newPath.toString());
+ jobConf.set(MERGE_KEY_COL_KEY, options.getMergeKeyCol());
+ jobConf.set(MERGE_SQOOP_RECORD_KEY, userClassName);
+
+ FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
+
+ if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(MergeRecordMapper.class);
+ } else {
+ job.setMapperClass(MergeTextMapper.class);
+ job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+ }
+
+ jobConf.set("mapred.output.key.class", userClassName);
+ job.setOutputValueClass(NullWritable.class);
+
+ job.setReducerClass(MergeReducer.class);
+
+ // Set the intermediate data types.
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(MergeRecord.class);
+
+ // Make sure Sqoop and anything else we need is on the classpath.
+ cacheJars(job, null);
+ setJob(job);
+ return this.runJob(job);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+}
+
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Given a set of SqoopRecord instances which are from a "new" dataset
+ * or an "old" dataset, extract a key column from the record and tag
+ * each record with a bit specifying whether it is a new or old record.
+ */
+public class MergeMapperBase<INKEY, INVAL>
+ extends Mapper<INKEY, INVAL, Text, MergeRecord> {
+
+ public static final Log LOG = LogFactory.getLog(
+ MergeMapperBase.class.getName());
+
+ private String keyColName; // name of the key column.
+ private boolean isNew; // true if this split is from the new dataset.
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
+
+ InputSplit is = context.getInputSplit();
+ FileSplit fs = (FileSplit) is;
+ Path splitPath = fs.getPath();
+
+ if (splitPath.toString().startsWith(
+ conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
+ this.isNew = true;
+ } else if (splitPath.toString().startsWith(
+ conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
+ this.isNew = false;
+ } else {
+ throw new IOException("File " + splitPath + " is not under new path "
+ + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
+ + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
+ }
+ }
+
+ protected void processRecord(SqoopRecord r, Context c)
+ throws IOException, InterruptedException {
+ MergeRecord mr = new MergeRecord(r, isNew);
+ Map<String, Object> fieldMap = r.getFieldMap();
+ if (null == fieldMap) {
+ throw new IOException("No field map in record " + r);
+ }
+ Object keyObj = fieldMap.get(keyColName);
+ if (null == keyObj) {
+ throw new IOException("Cannot join values on null key. "
+ + "Did you specify a key column that exists?");
+ } else {
+ c.write(new Text(keyObj.toString()), mr);
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Class that holds a record to be merged. This contains a SqoopRecord which
+ * is the "guts" of the item, and a boolean value indicating whether it is a
+ * "new" record or an "old" record. In the Reducer, we prefer to emit a new
+ * record rather than an old one, if a new one is available.
+ */
+public class MergeRecord implements Configurable, Writable {
+ private SqoopRecord sqoopRecord;
+ private boolean isNew;
+ private Configuration config;
+
+ /** Construct an empty MergeRecord. */
+ public MergeRecord() {
+ this.sqoopRecord = null;
+ this.isNew = false;
+ this.config = new Configuration();
+ }
+
+ /**
+ * Construct a MergeRecord with all fields initialized.
+ */
+ public MergeRecord(SqoopRecord sr, boolean recordIsNew) {
+ this.sqoopRecord = sr;
+ this.isNew = recordIsNew;
+ this.config = new Configuration();
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void setConf(Configuration conf) {
+ this.config = conf;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public Configuration getConf() {
+ return this.config;
+ }
+
+ /** @return true if this record came from the "new" dataset. */
+ public boolean isNewRecord() {
+ return isNew;
+ }
+
+ /**
+ * Set the isNew field to 'newVal'.
+ */
+ public void setNewRecord(boolean newVal) {
+ this.isNew = newVal;
+ }
+
+ /**
+ * @return the underlying SqoopRecord we're shipping.
+ */
+ public SqoopRecord getSqoopRecord() {
+ return this.sqoopRecord;
+ }
+
+ /**
+ * Set the SqoopRecord instance we should pass from the mapper to the
+ * reducer.
+ */
+ public void setSqoopRecord(SqoopRecord record) {
+ this.sqoopRecord = record;
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void readFields(DataInput in) throws IOException {
+ this.isNew = in.readBoolean();
+ String className = Text.readString(in);
+ if (null == this.sqoopRecord) {
+ // If we haven't already instantiated an inner SqoopRecord, do so here.
+ try {
+ Class<? extends SqoopRecord> recordClass =
+ (Class<? extends SqoopRecord>) config.getClassByName(className);
+ this.sqoopRecord = recordClass.newInstance();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ this.sqoopRecord.readFields(in);
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(this.isNew);
+ Text.writeString(out, this.sqoopRecord.getClass().getName());
+ this.sqoopRecord.write(out);
+ }
+
+ @Override
+ public String toString() {
+ return "" + this.sqoopRecord;
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MergeMapperBase;
+
+/**
+ * Mapper for the merge program which operates on SequenceFiles.
+ */
+public class MergeRecordMapper
+ extends MergeMapperBase<LongWritable, SqoopRecord> {
+
+ public void map(LongWritable key, SqoopRecord val, Context c)
+ throws IOException, InterruptedException {
+ processRecord(val, c);
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Reducer for merge tool. Given records tagged as 'old' or 'new', emit
+ * a new one if possible; otherwise, an old one.
+ */
+public class MergeReducer
+ extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
+
+ @Override
+ public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+ throws IOException, InterruptedException {
+ SqoopRecord bestRecord = null;
+ try {
+ for (MergeRecord val : vals) {
+ if (null == bestRecord && !val.isNewRecord()) {
+ // Use an old record if we don't have a new record.
+ bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+ } else if (val.isNewRecord()) {
+ bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+ }
+ }
+ } catch (CloneNotSupportedException cnse) {
+ throw new IOException(cnse);
+ }
+
+ if (null != bestRecord) {
+ c.write(bestRecord, NullWritable.get());
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MergeMapperBase;
+
+/**
+ * Mapper for the merge program which operates on text files that we need to
+ * parse into SqoopRecord instances.
+ */
+public class MergeTextMapper extends MergeMapperBase<LongWritable, Text> {
+
+ private SqoopRecord record;
+
+ @Override
+ protected void setup(Context c) throws IOException, InterruptedException {
+ Configuration conf = c.getConfiguration();
+
+ Class<? extends SqoopRecord> recordClass =
+ (Class<? extends SqoopRecord>) conf.getClass(
+ MergeJob.MERGE_SQOOP_RECORD_KEY, SqoopRecord.class);
+ this.record = ReflectionUtils.newInstance(recordClass, conf);
+
+ super.setup(c);
+ }
+
+ public void map(LongWritable key, Text val, Context c)
+ throws IOException, InterruptedException {
+ try {
+ this.record.parse(val);
+ } catch (RecordParser.ParseError pe) {
+ throw new IOException(pe);
+ }
+
+ processRecord(this.record, c);
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an import job using mysqldump in the mapper.
+ */
+public class MySQLDumpImportJob extends ImportJobBase {
+
+ public static final Log LOG =
+ LogFactory.getLog(MySQLDumpImportJob.class.getName());
+
+ public MySQLDumpImportJob(final SqoopOptions opts, ImportJobContext context)
+ throws ClassNotFoundException {
+ super(opts, MySQLDumpMapper.class, MySQLDumpInputFormat.class,
+ RawKeyTextOutputFormat.class, context);
+ }
+
+ /**
+ * Configure the inputformat to use for the job.
+ */
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+
+ if (null == tableName) {
+ LOG.error(
+ "mysqldump-based import cannot support free-form query imports.");
+ LOG.error("Do not use --direct and --query together for MySQL.");
+ throw new IOException("null tableName for MySQLDumpImportJob.");
+ }
+
+ ConnManager mgr = getContext().getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString(), username,
+ options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ String [] sqlColNames = null;
+ if (null != colNames) {
+ sqlColNames = new String[colNames.length];
+ for (int i = 0; i < colNames.length; i++) {
+ sqlColNames[i] = mgr.escapeColName(colNames[i]);
+ }
+ }
+
+ // It's ok if the where clause is null in DBInputFormat.setInput.
+ String whereClause = options.getWhereClause();
+
+ // We can't set the class properly in here, because we may not have the
+ // jar loaded in this JVM. So we start by calling setInput() with
+ // DBWritable and then overriding the string manually.
+
+ // Note that mysqldump also does *not* want a quoted table name.
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+ tableName, whereClause,
+ mgr.escapeColName(splitByCol), sqlColNames);
+
+ Configuration conf = job.getConfiguration();
+ conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+ options.getOutputFieldDelim());
+ conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+ options.getOutputRecordDelim());
+ conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+ options.getOutputEnclosedBy());
+ conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+ options.getOutputEscapedBy());
+ conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
+ options.isOutputEncloseRequired());
+ String [] extraArgs = options.getExtraArgs();
+ if (null != extraArgs) {
+ conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
+ }
+
+ LOG.debug("Using InputFormat: " + inputFormatClass);
+ job.setInputFormatClass(getInputFormatClass());
+ }
+
+ /**
+ * Set the mapper class implementation to use in the job,
+ * as well as any related configuration (e.g., map output types).
+ */
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+ job.setMapperClass(getMapperClass());
+ job.setOutputKeyClass(String.class);
+ job.setOutputValueClass(NullWritable.class);
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * InputFormat designed to take data-driven splits and feed them to a mysqldump
+ * invocation running in the mapper.
+ *
+ * The key emitted by this mapper is a WHERE clause to use in the command
+ * to mysqldump.
+ */
+public class MySQLDumpInputFormat extends DataDrivenDBInputFormat {
+
+ public static final Log LOG = LogFactory.getLog(
+ MySQLDumpInputFormat.class.getName());
+
+ /**
+ * A RecordReader that just takes the WHERE conditions from the DBInputSplit
+ * and relates them to the mapper as a single input record.
+ */
+ public static class MySQLDumpRecordReader
+ extends RecordReader<String, NullWritable> {
+
+ private boolean delivered;
+ private String clause;
+
+ public MySQLDumpRecordReader(InputSplit split) {
+ initialize(split, null);
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ boolean hasNext = !delivered;
+ delivered = true;
+ return hasNext;
+ }
+
+ @Override
+ public String getCurrentKey() {
+ return clause;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public float getProgress() {
+ return delivered ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
+ (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;
+
+ this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
+ + dbSplit.getUpperClause() + ")";
+ }
+ }
+
+ public RecordReader<String, NullWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new MySQLDumpRecordReader(split);
+ }
+
+}
+