You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC
svn commit: r1190489 [4/6] - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.util.LoggingUtils;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Abstract RecordWriter base class that buffers SqoopRecords to be injected
+ * into JDBC SQL PreparedStatements to be executed by the
+ * AsyncSqlOutputFormat's background thread.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V>
+ extends RecordWriter<K, V> {
+
+ private static final Log LOG = LogFactory.getLog(AsyncSqlRecordWriter.class);
+
+ private Connection connection;
+
+ private Configuration conf;
+
+ protected final int rowsPerStmt; // rows to insert per statement.
+
+ // Buffer for records to be put into export SQL statements.
+ private List<SqoopRecord> records;
+
+ // Background thread to actually perform the updates.
+ private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
+ private boolean startedExecThread;
+
+ public AsyncSqlRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ this.conf = context.getConfiguration();
+
+ this.rowsPerStmt = conf.getInt(
+ AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
+ AsyncSqlOutputFormat.DEFAULT_RECORDS_PER_STATEMENT);
+ int stmtsPerTx = conf.getInt(
+ AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY,
+ AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION);
+
+ DBConfiguration dbConf = new DBConfiguration(conf);
+ this.connection = dbConf.getConnection();
+ this.connection.setAutoCommit(false);
+
+ this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
+
+ this.execThread = new AsyncSqlOutputFormat.AsyncSqlExecThread(
+ connection, stmtsPerTx);
+ this.execThread.setDaemon(true);
+ this.startedExecThread = false;
+ }
+
+ /**
+ * Allow subclasses access to the Connection instance we hold.
+ * This Connection is shared with the asynchronous SQL exec thread.
+ * Any uses of the Connection must be synchronized on it.
+ * @return the Connection object used for this SQL transaction.
+ */
+ protected final Connection getConnection() {
+ return this.connection;
+ }
+
+ /**
+ * Allow subclasses access to the Configuration.
+ * @return the Configuration for this MapReduc task.
+ */
+ protected final Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Should return 'true' if the PreparedStatements generated by the
+ * RecordWriter are intended to be executed in "batch" mode, or false
+ * if it's just one big statement.
+ */
+ protected boolean isBatchExec() {
+ return false;
+ }
+
+ /**
+ * Generate the PreparedStatement object that will be fed into the execution
+ * thread. All parameterized fields of the PreparedStatement must be set in
+ * this method as well; this is usually based on the records collected from
+ * the user in the userRecords list.
+ *
+ * Note that any uses of the Connection object here must be synchronized on
+ * the Connection.
+ *
+ * @param userRecords a list of records that should be injected into SQL
+ * statements.
+ * @return a PreparedStatement to be populated with rows
+ * from the collected record list.
+ */
+ protected abstract PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException;
+
+ /**
+ * Takes the current contents of 'records' and formats and executes the
+ * INSERT statement.
+ * @param closeConn if true, commits the transaction and closes the
+ * connection.
+ */
+ private void execUpdate(boolean commit, boolean stopThread)
+ throws InterruptedException, SQLException {
+
+ if (!startedExecThread) {
+ this.execThread.start();
+ this.startedExecThread = true;
+ }
+
+ PreparedStatement stmt = null;
+ boolean successfulPut = false;
+ try {
+ if (records.size() > 0) {
+ stmt = getPreparedStatement(records);
+ this.records.clear();
+ }
+
+ // Pass this operation off to the update thread. This will block if
+ // the update thread is already performing an update.
+ AsyncSqlOutputFormat.AsyncDBOperation op =
+ new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(),
+ commit, stopThread);
+ execThread.put(op);
+ successfulPut = true; // op has been posted to the other thread.
+ } finally {
+ if (!successfulPut && null != stmt) {
+ // We created a statement but failed to enqueue it. Close it.
+ stmt.close();
+ }
+ }
+
+ // Check for any previous SQLException. If one happened, rethrow it here.
+ SQLException lastException = execThread.getLastError();
+ if (null != lastException) {
+ LoggingUtils.logAll(LOG, lastException);
+ throw lastException;
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ try {
+ try {
+ execUpdate(true, true);
+ execThread.join();
+ } catch (SQLException sqle) {
+ throw new IOException(sqle);
+ }
+
+ // If we're not leaving on an error return path already,
+ // now that execThread is definitely stopped, check that the
+ // error slot remains empty.
+ SQLException lastErr = execThread.getLastError();
+ if (null != lastErr) {
+ throw new IOException(lastErr);
+ }
+ } finally {
+ try {
+ closeConnection(context);
+ } catch (SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
+ }
+
+ public void closeConnection(TaskAttemptContext context)
+ throws SQLException {
+ this.connection.close();
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void write(K key, V value)
+ throws InterruptedException, IOException {
+ try {
+ records.add((SqoopRecord) key.clone());
+ if (records.size() >= this.rowsPerStmt) {
+ execUpdate(false, false);
+ }
+ } catch (CloneNotSupportedException cnse) {
+ throw new IOException("Could not buffer record", cnse);
+ } catch (SQLException sqlException) {
+ throw new IOException(sqlException);
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Identity mapper that continuously reports progress via a background thread.
+ */
+public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
+ extends Mapper<KEYIN, VALIN, KEYOUT, VALOUT> {
+
+ public static final Log LOG = LogFactory.getLog(
+ AutoProgressMapper.class.getName());
+
+ /**
+ * Total number of millis for which progress will be reported by the
+ * auto-progress thread. If this is zero, then the auto-progress thread will
+ * never voluntarily exit.
+ */
+ private int maxProgressPeriod;
+
+ /**
+ * Number of milliseconds to sleep for between loop iterations. Must be less
+ * than report interval.
+ */
+ private int sleepInterval;
+
+ /**
+ * Number of milliseconds between calls to Reporter.progress().
+ * Should be a multiple of the sleepInterval.
+ */
+ private int reportInterval;
+
+ public static final String MAX_PROGRESS_PERIOD_KEY =
+ "sqoop.mapred.auto.progress.max";
+ public static final String SLEEP_INTERVAL_KEY =
+ "sqoop.mapred.auto.progress.sleep";
+ public static final String REPORT_INTERVAL_KEY =
+ "sqoop.mapred.auto.progress.report";
+
+ // Sleep for 10 seconds at a time.
+ public static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+ // Report progress every 30 seconds.
+ public static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+ // Disable max progress, by default.
+ public static final int DEFAULT_MAX_PROGRESS = 0;
+
+ private class ProgressThread extends Thread {
+
+ private volatile boolean keepGoing; // While this is true, thread runs.
+
+ private Context context;
+ private long startTimeMillis;
+ private long lastReportMillis;
+
+ public ProgressThread(final Context ctxt) {
+ this.context = ctxt;
+ this.keepGoing = true;
+ }
+
+ public void signalShutdown() {
+ this.keepGoing = false; // volatile update.
+ this.interrupt();
+ }
+
+ public void run() {
+ this.lastReportMillis = System.currentTimeMillis();
+ this.startTimeMillis = this.lastReportMillis;
+
+ final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
+ final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
+ final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
+
+ // In a loop:
+ // * Check that we haven't run for too long (maxProgressPeriod).
+ // * If it's been a report interval since we last made progress,
+ // make more.
+ // * Sleep for a bit.
+ // * If the parent thread has signaled for exit, do so.
+ while (this.keepGoing) {
+ long curTimeMillis = System.currentTimeMillis();
+
+ if (MAX_PROGRESS != 0
+ && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+ this.keepGoing = false;
+ LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
+ + " ms.");
+ break;
+ }
+
+ if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+ // It's been a full report interval -- claim progress.
+ LOG.debug("Auto-progress thread reporting progress");
+ this.context.progress();
+ this.lastReportMillis = curTimeMillis;
+ }
+
+ // Unless we got an interrupt while we were working,
+ // sleep a bit before doing more work.
+ if (!Thread.interrupted()) {
+ try {
+ Thread.sleep(SLEEP_INTERVAL);
+ } catch (InterruptedException ie) {
+ // we were notified on something; not necessarily an error.
+ }
+ }
+ }
+
+ LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+ }
+ }
+
+ /**
+ * Set configuration parameters for the auto-progress thread.
+ */
+ private void configureAutoProgress(Configuration job) {
+ this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
+ DEFAULT_MAX_PROGRESS);
+ this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
+ DEFAULT_SLEEP_INTERVAL);
+ this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
+ DEFAULT_REPORT_INTERVAL);
+
+ if (this.reportInterval < 1) {
+ LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+ + DEFAULT_REPORT_INTERVAL);
+ this.reportInterval = DEFAULT_REPORT_INTERVAL;
+ }
+
+ if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+ LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+ + DEFAULT_SLEEP_INTERVAL);
+ this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+ }
+
+ if (this.maxProgressPeriod < 0) {
+ LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+ + DEFAULT_MAX_PROGRESS);
+ this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+ }
+ }
+
+
+ // map() method intentionally omitted; Mapper.map() is the identity mapper.
+
+
+ /**
+ * Run the mapping process for this task, wrapped in an auto-progress system.
+ */
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ configureAutoProgress(context.getConfiguration());
+ ProgressThread thread = this.new ProgressThread(context);
+
+ try {
+ thread.setDaemon(true);
+ thread.start();
+
+ // use default run() method to actually drive the mapping.
+ super.run(context);
+ } finally {
+ // Tell the progress thread to exit..
+ LOG.debug("Instructing auto-progress thread to quit.");
+ thread.signalShutdown();
+ try {
+ // And wait for that to happen.
+ LOG.debug("Waiting for progress thread shutdown...");
+ thread.join();
+ LOG.debug("Progress thread shutdown detected.");
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted when waiting on auto-progress thread: "
+ + ie.toString());
+ }
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import com.cloudera.sqoop.orm.ClassWriter;
+
+/**
+ * Exports records from an Avro data file.
+ */
+public class AvroExportMapper
+ extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
+ SqoopRecord, NullWritable> {
+
+ private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+
+ private static final String TIME_TYPE = "java.sql.Time";
+
+ private static final String DATE_TYPE = "java.sql.Date";
+
+ private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+
+ public static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map";
+
+ private MapWritable columnTypes;
+ private SqoopRecord recordImpl;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ // Instantiate a copy of the user's class to hold and parse the record.
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == recordImpl) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+
+ columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
+ MapWritable.class);
+ }
+
+ @Override
+ protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
+ Context context) throws IOException, InterruptedException {
+ context.write(toSqoopRecord(key.datum()), NullWritable.get());
+ }
+
+ private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
+ Schema avroSchema = record.getSchema();
+ for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
+ String columnName = e.getKey().toString();
+ String columnType = e.getValue().toString();
+ String cleanedCol = ClassWriter.toIdentifier(columnName);
+ Field field = getField(avroSchema, cleanedCol, record);
+ if (field == null) {
+ throw new IOException("Cannot find field " + cleanedCol
+ + " in Avro schema " + avroSchema);
+ } else {
+ Object avroObject = record.get(field.name());
+ Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
+ recordImpl.setField(cleanedCol, fieldVal);
+ }
+ }
+ return recordImpl;
+ }
+
+ private Field getField(Schema avroSchema, String fieldName,
+ GenericRecord record) {
+ for (Field field : avroSchema.getFields()) {
+ if (field.name().equalsIgnoreCase(fieldName)) {
+ return field;
+ }
+ }
+ return null;
+ }
+
+ private Object fromAvro(Object avroObject, Schema fieldSchema,
+ String columnType) {
+ // map from Avro type to Sqoop's Java representation of the SQL type
+ // see SqlManager#toJavaType
+
+ if (avroObject == null) {
+ return null;
+ }
+
+ switch (fieldSchema.getType()) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ case INT:
+ case FLOAT:
+ case DOUBLE:
+ return avroObject;
+ case LONG:
+ if (columnType.equals(DATE_TYPE)) {
+ return new Date((Long) avroObject);
+ } else if (columnType.equals(TIME_TYPE)) {
+ return new Time((Long) avroObject);
+ } else if (columnType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp((Long) avroObject);
+ }
+ return avroObject;
+ case BYTES:
+ ByteBuffer bb = (ByteBuffer) avroObject;
+ BytesWritable bw = new BytesWritable();
+ bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+ return bw;
+ case STRING:
+ if (columnType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(avroObject.toString());
+ } else if (columnType.equals(DATE_TYPE)) {
+ return Date.valueOf(avroObject.toString());
+ } else if (columnType.equals(TIME_TYPE)) {
+ return Time.valueOf(avroObject.toString());
+ } else if (columnType.equals(TIMESTAMP_TYPE)) {
+ return Timestamp.valueOf(avroObject.toString());
+ }
+ return avroObject.toString();
+ case ENUM:
+ return ((GenericEnumSymbol) avroObject).toString();
+ case UNION:
+ List<Schema> types = fieldSchema.getTypes();
+ if (types.size() != 2) {
+ throw new IllegalArgumentException("Only support union with null");
+ }
+ Schema s1 = types.get(0);
+ Schema s2 = types.get(1);
+ if (s1.getType() == Schema.Type.NULL) {
+ return fromAvro(avroObject, s2, columnType);
+ } else if (s2.getType() == Schema.Type.NULL) {
+ return fromAvro(avroObject, s1, columnType);
+ } else {
+ throw new IllegalArgumentException("Only support union with null");
+ }
+ case FIXED:
+ return new BytesWritable(((GenericFixed) avroObject).bytes());
+ case RECORD:
+ case ARRAY:
+ case MAP:
+ default:
+ throw new IllegalArgumentException("Cannot convert Avro type "
+ + fieldSchema.getType());
+ }
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by transforming them to Avro records in an Avro data file.
+ */
+public class AvroImportMapper
+ extends AutoProgressMapper<LongWritable, SqoopRecord,
+ AvroWrapper<GenericRecord>, NullWritable> {
+
+ private final AvroWrapper<GenericRecord> wrapper =
+ new AvroWrapper<GenericRecord>();
+ private Schema schema;
+
+ @Override
+ protected void setup(Context context) {
+ schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+ }
+
+ @Override
+ protected void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ wrapper.datum(toGenericRecord(val));
+ context.write(wrapper, NullWritable.get());
+ }
+
+
+ private GenericRecord toGenericRecord(SqoopRecord val) {
+ Map<String, Object> fieldMap = val.getFieldMap();
+ GenericRecord record = new GenericData.Record(schema);
+ for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+ record.put(entry.getKey(), toAvro(entry.getValue()));
+ }
+ return record;
+ }
+
+ /**
+ * Convert the Avro representation of a Java type (that has already been
+ * converted from the SQL equivalent).
+ * @param o
+ * @return
+ */
+ private Object toAvro(Object o) {
+ if (o instanceof BigDecimal) {
+ return o.toString();
+ } else if (o instanceof Date) {
+ return ((Date) o).getTime();
+ } else if (o instanceof Time) {
+ return ((Time) o).getTime();
+ } else if (o instanceof Timestamp) {
+ return ((Timestamp) o).getTime();
+ } else if (o instanceof BytesWritable) {
+ BytesWritable bw = (BytesWritable) o;
+ return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+ } else if (o instanceof ClobRef) {
+ throw new UnsupportedOperationException("ClobRef not suported");
+ } else if (o instanceof BlobRef) {
+ throw new UnsupportedOperationException("BlobRef not suported");
+ }
+ // primitive types (Integer, etc) are left unchanged
+ return o;
+ }
+
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. */
+public class AvroInputFormat<T>
+ extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job)) {
+ if (file.getPath().getName().endsWith(
+ org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+ result.add(file);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ context.setStatus(split.toString());
+ return new AvroRecordReader<T>();
+ }
+
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Helper class for setting up an Avro MapReduce job.
+ */
+public final class AvroJob {
+ public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+
+ private AvroJob() {
+ }
+
+ public static void setMapOutputSchema(Configuration job, Schema s) {
+ job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ }
+
+ /** Return a job's map output key schema. */
+ public static Schema getMapOutputSchema(Configuration job) {
+ return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+public class AvroOutputFormat<T>
+ extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+ @Override
+ public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+
+ Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+
+ final DataFileWriter<T> WRITER =
+ new DataFileWriter<T>(new GenericDatumWriter<T>());
+
+ Path path = getDefaultWorkFile(context,
+ org.apache.avro.mapred.AvroOutputFormat.EXT);
+ WRITER.create(schema,
+ path.getFileSystem(context.getConfiguration()).create(path));
+
+ return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+ @Override
+ public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+ throws IOException {
+ WRITER.append(wrapper.datum());
+ }
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ WRITER.close();
+ }
+ };
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+ extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+ private FileReader<T> reader;
+ private long start;
+ private long end;
+ private AvroWrapper<T> key;
+ private NullWritable value;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration conf = context.getConfiguration();
+ SeekableInput in = new FsInput(split.getPath(), conf);
+ DatumReader<T> datumReader = new GenericDatumReader<T>();
+ this.reader = DataFileReader.openReader(in, datumReader);
+ reader.sync(split.getStart()); // sync to start
+ this.start = reader.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!reader.hasNext() || reader.pastSync(end)) {
+ key = null;
+ value = null;
+ return false;
+ }
+ if (key == null) {
+ key = new AvroWrapper<T>();
+ }
+ if (value == null) {
+ value = NullWritable.get();
+ }
+ key.datum(reader.next(key.datum()));
+ return true;
+ }
+
+ @Override
+ public AvroWrapper<T> getCurrentKey() throws IOException,
+ InterruptedException {
+ return key;
+ }
+
+ @Override
+ public NullWritable getCurrentValue()
+ throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return reader.tell();
+ }
+
+ @Override
+ public void close() throws IOException { reader.close(); }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * RecordReader that CombineFileRecordReader can instantiate, which itself
+ * translates a CombineFileSplit into a FileSplit.
+ */
+public class CombineShimRecordReader
+ extends RecordReader<LongWritable, Object> {
+
+ public static final Log LOG =
+ LogFactory.getLog(CombineShimRecordReader.class.getName());
+
+ private CombineFileSplit split;
+ private TaskAttemptContext context;
+ private int index;
+ private RecordReader<LongWritable, Object> rr;
+
+ /**
+ * Constructor invoked by CombineFileRecordReader that identifies part of a
+ * CombineFileSplit to use.
+ */
+ public CombineShimRecordReader(CombineFileSplit split,
+ TaskAttemptContext context, Integer index)
+ throws IOException, InterruptedException {
+ this.index = index;
+ this.split = (CombineFileSplit) split;
+ this.context = context;
+
+ createChildReader();
+ }
+
+ @Override
+ public void initialize(InputSplit curSplit, TaskAttemptContext curContext)
+ throws IOException, InterruptedException {
+ this.split = (CombineFileSplit) curSplit;
+ this.context = curContext;
+
+ if (null == rr) {
+ createChildReader();
+ }
+
+ FileSplit fileSplit = new FileSplit(this.split.getPath(index),
+ this.split.getOffset(index), this.split.getLength(index),
+ this.split.getLocations());
+ this.rr.initialize(fileSplit, this.context);
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return rr.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (null != rr) {
+ rr.close();
+ rr = null;
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey()
+ throws IOException, InterruptedException {
+ return rr.getCurrentKey();
+ }
+
+ @Override
+ public Object getCurrentValue()
+ throws IOException, InterruptedException {
+ return rr.getCurrentValue();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return rr.nextKeyValue();
+ }
+
+ /**
+ * Actually instantiate the user's chosen RecordReader implementation.
+ */
+ @SuppressWarnings("unchecked")
+ private void createChildReader() throws IOException, InterruptedException {
+ LOG.debug("ChildSplit operates on: " + split.getPath(index));
+
+ Configuration conf = context.getConfiguration();
+
+ // Determine the file format we're reading.
+ Class rrClass;
+ if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
+ rrClass = SequenceFileRecordReader.class;
+ } else {
+ rrClass = LineRecordReader.class;
+ }
+
+ // Create the appropriate record reader.
+ this.rr = (RecordReader<LongWritable, Object>)
+ ReflectionUtils.newInstance(rrClass, conf);
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.ImportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+
+/**
+ * Actually runs a jdbc import job using the ORM files generated by the
+ * sqoop.orm package. Uses DataDrivenDBInputFormat.
+ */
+public class DataDrivenImportJob extends ImportJobBase {
+
+ public static final Log LOG = LogFactory.getLog(
+ DataDrivenImportJob.class.getName());
+
+ @SuppressWarnings("unchecked")
+ public DataDrivenImportJob(final SqoopOptions opts) {
+ super(opts, null, DataDrivenDBInputFormat.class, null, null);
+ }
+
+ public DataDrivenImportJob(final SqoopOptions opts,
+ final Class<? extends InputFormat> inputFormatClass,
+ ImportJobContext context) {
+ super(opts, null, inputFormatClass, null, context);
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws IOException {
+ if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+ // For text files, specify these as the output types; for
+ // other types, we just use the defaults.
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ ConnManager connManager = getContext().getConnManager();
+ AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+ connManager, tableName);
+ Schema schema = generator.generate();
+ AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
+ }
+
+ job.setMapperClass(getMapperClass());
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+ return TextImportMapper.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.SequenceFile) {
+ return SequenceFileImportMapper.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ return AvroImportMapper.class;
+ }
+
+ return null;
+ }
+
+ @Override
+ protected Class<? extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+ return RawKeyTextOutputFormat.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.SequenceFile) {
+ return SequenceFileOutputFormat.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ return AvroOutputFormat.class;
+ }
+
+ return null;
+ }
+
+ @Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol) throws IOException {
+ ConnManager mgr = getContext().getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString(),
+ options.getFetchSize());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString(),
+ username, options.getPassword(), options.getFetchSize());
+ }
+
+ if (null != tableName) {
+ // Import a table.
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ String [] sqlColNames = null;
+ if (null != colNames) {
+ sqlColNames = new String[colNames.length];
+ for (int i = 0; i < colNames.length; i++) {
+ sqlColNames[i] = mgr.escapeColName(colNames[i]);
+ }
+ }
+
+ // It's ok if the where clause is null in DBInputFormat.setInput.
+ String whereClause = options.getWhereClause();
+
+ // We can't set the class properly in here, because we may not have the
+ // jar loaded in this JVM. So we start by calling setInput() with
+ // DBWritable and then overriding the string manually.
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+ mgr.escapeTableName(tableName), whereClause,
+ mgr.escapeColName(splitByCol), sqlColNames);
+
+ // If user specified boundary query on the command line propagate it to
+ // the job
+ if(options.getBoundaryQuery() != null) {
+ DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
+ options.getBoundaryQuery());
+ }
+ } else {
+ // Import a free-form query.
+ String inputQuery = options.getSqlQuery();
+ String sanitizedQuery = inputQuery.replace(
+ DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
+
+ String inputBoundingQuery = options.getBoundaryQuery();
+
+ if(inputBoundingQuery == null) {
+ inputBoundingQuery =
+ mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
+ if (inputBoundingQuery == null) {
+ if (splitByCol != null) {
+ inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
+ + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
+ } else {
+ inputBoundingQuery = "";
+ }
+ }
+ }
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+ inputQuery, inputBoundingQuery);
+ new DBConfiguration(job.getConfiguration()).setInputOrderBy(
+ splitByCol);
+ }
+
+ LOG.debug("Using table class: " + tableClassName);
+ job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
+ tableClassName);
+
+ job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
+ options.getInlineLobLimit());
+
+ LOG.debug("Using InputFormat: " + inputFormatClass);
+ job.setInputFormatClass(inputFormatClass);
+ } finally {
+ try {
+ mgr.close();
+ } catch (SQLException sqlE) {
+ LOG.warn("Error closing connection: " + sqlE);
+ }
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * OutputFormat that produces a RecordReader which instantiates
+ * a FieldMapProcessor which will process FieldMappable
+ * output keys.
+ *
+ * <p>The output value is ignored.</p>
+ *
+ * <p>The FieldMapProcessor implementation may do any arbitrary
+ * processing on the object. For example, it may write an object
+ * to HBase, etc.</p>
+ *
+ * <p>If the FieldMapProcessor implementation also implements
+ * Closeable, it will be close()'d in the RecordReader's close()
+ * method.</p>
+ *
+ * <p>If the FMP implements Configurable, it will be configured
+ * correctly via ReflectionUtils.</p>
+ */
+public class DelegatingOutputFormat<K extends FieldMappable, V>
+ extends OutputFormat<K, V> {
+
+ /** conf key: the FieldMapProcessor class to instantiate. */
+ public static final String DELEGATE_CLASS_KEY =
+ "sqoop.output.delegate.field.map.processor.class";
+
+ @Override
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ if (null == conf.get(DELEGATE_CLASS_KEY)) {
+ throw new IOException("Delegate FieldMapProcessor class is not set.");
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NullOutputCommitter();
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new DelegatingRecordWriter(context);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class DelegatingRecordWriter extends RecordWriter<K, V> {
+
+ private Configuration conf;
+
+ private FieldMapProcessor mapProcessor;
+
+ public DelegatingRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException {
+
+ this.conf = context.getConfiguration();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends FieldMapProcessor> procClass =
+ (Class<? extends FieldMapProcessor>)
+ conf.getClass(DELEGATE_CLASS_KEY, null);
+ this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf);
+ }
+
+ protected Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (mapProcessor instanceof Closeable) {
+ ((Closeable) mapProcessor).close();
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void write(K key, V value)
+ throws InterruptedException, IOException {
+ try {
+ mapProcessor.accept(key);
+ } catch (ProcessingException pe) {
+ throw new IOException(pe);
+ }
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * This class uses batch mode to execute underlying statements instead of
+ * using a single multirow insert statement as its superclass.
+ */
+public class ExportBatchOutputFormat<K extends SqoopRecord, V>
+ extends ExportOutputFormat<K, V> {
+
+ private static final Log LOG =
+ LogFactory.getLog(ExportBatchOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new ExportBatchRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class ExportBatchRecordWriter extends ExportRecordWriter {
+
+ public ExportBatchRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ protected boolean isBatchExec() {
+ // We use batches here.
+ return true;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ protected PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ PreparedStatement stmt = null;
+
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ Connection conn = getConnection();
+ synchronized (conn) {
+ stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
+ }
+
+ // Inject the record parameters into the VALUES clauses.
+ for (SqoopRecord record : userRecords) {
+ record.write(stmt, 0);
+ stmt.addBatch();
+ }
+
+ return stmt;
+ }
+
+ /**
+ * @return an INSERT statement.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + tableName + " ");
+
+ int numSlots;
+ if (this.columnNames != null) {
+ numSlots = this.columnNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : columnNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = this.columnCount; // set if columnNames is null.
+ }
+
+ sb.append("VALUES ");
+
+ // generates the (?, ?, ?...).
+ sb.append("(");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+
+ sb.append("?");
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * InputFormat that generates a user-defined number of splits to inject data
+ * into the database.
+ */
+public class ExportInputFormat
+ extends CombineFileInputFormat<LongWritable, Object> {
+
+ public static final Log LOG =
+ LogFactory.getLog(ExportInputFormat.class.getName());
+
+ public static final int DEFAULT_NUM_MAP_TASKS = 4;
+
+ public ExportInputFormat() {
+ }
+
+ /**
+ * @return the number of bytes across all files in the job.
+ */
+ private long getJobSize(JobContext job) throws IOException {
+ List<FileStatus> stats = listStatus(job);
+ long count = 0;
+ for (FileStatus stat : stats) {
+ count += stat.getLen();
+ }
+
+ return count;
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ // Set the max split size based on the number of map tasks we want.
+ long numTasks = getNumMapTasks(job);
+ long numFileBytes = getJobSize(job);
+ long maxSplitSize = numFileBytes / numTasks;
+
+ setMaxSplitSize(maxSplitSize);
+
+ LOG.debug("Target numMapTasks=" + numTasks);
+ LOG.debug("Total input bytes=" + numFileBytes);
+ LOG.debug("maxSplitSize=" + maxSplitSize);
+
+ List<InputSplit> splits = super.getSplits(job);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated splits:");
+ for (InputSplit split : splits) {
+ LOG.debug(" " + split);
+ }
+ }
+ return splits;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+
+ CombineFileSplit combineSplit = (CombineFileSplit) split;
+
+ // Use CombineFileRecordReader since this can handle CombineFileSplits
+ // and instantiate another RecordReader in a loop; do this with the
+ // CombineShimRecordReader.
+ RecordReader rr = new CombineFileRecordReader(combineSplit, context,
+ CombineShimRecordReader.class);
+
+ return rr;
+ }
+
+ /**
+ * Allows the user to control the number of map tasks used for this
+ * export job.
+ */
+ public static void setNumMapTasks(JobContext job, int numTasks) {
+ job.getConfiguration().setInt(ExportJobBase.EXPORT_MAP_TASKS_KEY, numTasks);
+ }
+
+ /**
+ * @return the number of map tasks to use in this export job.
+ */
+ public static int getNumMapTasks(JobContext job) {
+ return job.getConfiguration().getInt(ExportJobBase.EXPORT_MAP_TASKS_KEY,
+ DEFAULT_NUM_MAP_TASKS);
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Base class for running an export MapReduce job.
+ */
+public class ExportJobBase extends JobBase {
+
+ /**
+ * The (inferred) type of a file or group of files.
+ */
+ public enum FileType {
+ SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+ }
+
+ public static final Log LOG = LogFactory.getLog(
+ ExportJobBase.class.getName());
+
+ /** What SqoopRecord class to use to read a record for export. */
+ public static final String SQOOP_EXPORT_TABLE_CLASS_KEY =
+ "sqoop.mapreduce.export.table.class";
+
+ /**
+ * What column of the table to use for the WHERE clause of
+ * an updating export.
+ */
+ public static final String SQOOP_EXPORT_UPDATE_COL_KEY =
+ "sqoop.mapreduce.export.update.col";
+
+ /** Number of map tasks to use for an export. */
+ public static final String EXPORT_MAP_TASKS_KEY =
+ "sqoop.mapreduce.export.map.tasks";
+
+ protected ExportJobContext context;
+
+ public ExportJobBase(final ExportJobContext ctxt) {
+ this(ctxt, null, null, null);
+ }
+
+ public ExportJobBase(final ExportJobContext ctxt,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+ super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
+ this.context = ctxt;
+ }
+
+ /**
+ * @return true if p is a SequenceFile, or a directory containing
+ * SequenceFiles.
+ */
+ public static boolean isSequenceFiles(Configuration conf, Path p)
+ throws IOException {
+ return getFileType(conf, p) == FileType.SEQUENCE_FILE;
+ }
+
+ /**
+ * @return the type of the file represented by p (or the files in p, if a
+ * directory)
+ */
+ public static FileType getFileType(Configuration conf, Path p)
+ throws IOException {
+ FileSystem fs = p.getFileSystem(conf);
+
+ try {
+ FileStatus stat = fs.getFileStatus(p);
+
+ if (null == stat) {
+ // Couldn't get the item.
+ LOG.warn("Input path " + p + " does not exist");
+ return FileType.UNKNOWN;
+ }
+
+ if (stat.isDir()) {
+ FileStatus [] subitems = fs.listStatus(p);
+ if (subitems == null || subitems.length == 0) {
+ LOG.warn("Input path " + p + " contains no files");
+ return FileType.UNKNOWN; // empty dir.
+ }
+
+ // Pick a child entry to examine instead.
+ boolean foundChild = false;
+ for (int i = 0; i < subitems.length; i++) {
+ stat = subitems[i];
+ if (!stat.isDir() && !stat.getPath().getName().startsWith("_")) {
+ foundChild = true;
+ break; // This item is a visible file. Check it.
+ }
+ }
+
+ if (!foundChild) {
+ stat = null; // Couldn't find a reasonable candidate.
+ }
+ }
+
+ if (null == stat) {
+ LOG.warn("null FileStatus object in isSequenceFiles(); "
+ + "assuming false.");
+ return FileType.UNKNOWN;
+ }
+
+ Path target = stat.getPath();
+ return fromMagicNumber(target, conf);
+ } catch (FileNotFoundException fnfe) {
+ LOG.warn("Input path " + p + " does not exist");
+ return FileType.UNKNOWN; // doesn't exist!
+ }
+ }
+
+ /**
+ * @param file a file to test.
+ * @return true if 'file' refers to a SequenceFile.
+ */
+ private static FileType fromMagicNumber(Path file, Configuration conf) {
+ // Test target's header to see if it contains magic numbers indicating its
+ // file type
+ byte [] header = new byte[3];
+ FSDataInputStream is = null;
+ try {
+ FileSystem fs = file.getFileSystem(conf);
+ is = fs.open(file);
+ is.readFully(header);
+ } catch (IOException ioe) {
+ // Error reading header or EOF; assume unknown
+ LOG.warn("IOException checking input file header: " + ioe);
+ return FileType.UNKNOWN;
+ } finally {
+ try {
+ if (null != is) {
+ is.close();
+ }
+ } catch (IOException ioe) {
+ // ignore; closing.
+ LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
+ }
+ }
+
+ if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
+ return FileType.SEQUENCE_FILE;
+ }
+ if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
+ return FileType.AVRO_DATA_FILE;
+ }
+ return FileType.UNKNOWN;
+ }
+
+ /**
+ * @return the Path to the files we are going to export to the db.
+ */
+ protected Path getInputPath() throws IOException {
+ Path inputPath = new Path(context.getOptions().getExportDir());
+ Configuration conf = options.getConf();
+ inputPath = inputPath.makeQualified(FileSystem.get(conf));
+ return inputPath;
+ }
+
+ @Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+ FileInputFormat.addInputPath(job, getInputPath());
+ }
+
+ @Override
+ protected Class<? extends InputFormat> getInputFormatClass()
+ throws ClassNotFoundException {
+ Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
+ if (null == configuredIF) {
+ return ExportInputFormat.class;
+ } else {
+ return configuredIF;
+ }
+ }
+
+ @Override
+ protected Class<? extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ Class<? extends OutputFormat> configuredOF = super.getOutputFormatClass();
+ if (null == configuredOF) {
+ if (!options.isBatchMode()) {
+ return ExportOutputFormat.class;
+ } else {
+ return ExportBatchOutputFormat.class;
+ }
+ } else {
+ return configuredOF;
+ }
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+
+ job.setMapperClass(getMapperClass());
+
+ // Concurrent writes of the same records would be problematic.
+ ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
+
+ job.setMapOutputKeyClass(SqoopRecord.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ }
+
+ @Override
+ protected int configureNumTasks(Job job) throws IOException {
+ int numMaps = super.configureNumTasks(job);
+ job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps);
+ return numMaps;
+ }
+
+ @Override
+ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
+ InterruptedException {
+
+ PerfCounters perfCounters = new PerfCounters();
+ perfCounters.startClock();
+
+ boolean success = job.waitForCompletion(true);
+ perfCounters.stopClock();
+
+ Counters jobCounters = job.getCounters();
+ // If the job has been retired, these may be unavailable.
+ if (null == jobCounters) {
+ displayRetiredJobNotice(LOG);
+ } else {
+ perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
+ .findCounter("HDFS_BYTES_READ").getValue());
+ LOG.info("Transferred " + perfCounters.toString());
+ long numRecords = ConfigurationHelper.getNumMapInputRecords(job);
+ LOG.info("Exported " + numRecords + " records.");
+ }
+
+ return success;
+ }
+
+ /**
+ * Run an export job to dump a table from HDFS to a database. If a staging
+ * table is specified and the connection manager supports staging of data,
+ * the export will first populate the staging table and then migrate the
+ * data to the target table.
+ * @throws IOException if the export job encounters an IO error
+ * @throws ExportException if the job fails unexpectedly or is misconfigured.
+ */
+ public void runExport() throws ExportException, IOException {
+
+ ConnManager cmgr = context.getConnManager();
+ SqoopOptions options = context.getOptions();
+ Configuration conf = options.getConf();
+
+ String outputTableName = context.getTableName();
+ String stagingTableName = context.getOptions().getStagingTableName();
+
+ String tableName = outputTableName;
+ boolean stagingEnabled = false;
+ if (stagingTableName != null) { // user has specified the staging table
+ if (cmgr.supportsStagingForExport()) {
+ LOG.info("Data will be staged in the table: " + stagingTableName);
+ tableName = stagingTableName;
+ stagingEnabled = true;
+ } else {
+ throw new ExportException("The active connection manager ("
+ + cmgr.getClass().getCanonicalName()
+ + ") does not support staging of data for export. "
+ + "Please retry without specifying the --staging-table option.");
+ }
+ }
+
+ String tableClassName =
+ new TableClassName(options).getClassForTable(outputTableName);
+ String ormJarFile = context.getJarFile();
+
+ LOG.info("Beginning export of " + outputTableName);
+ loadJars(conf, ormJarFile, tableClassName);
+
+ if (stagingEnabled) {
+ // Prepare the staging table
+ if (options.doClearStagingTable()) {
+ try {
+ // Delete all records from staging table
+ cmgr.deleteAllRecords(stagingTableName);
+ } catch (SQLException ex) {
+ throw new ExportException(
+ "Failed to empty staging table before export run", ex);
+ }
+ } else {
+ // User has not explicitly specified the clear staging table option.
+ // Assert that the staging table is empty.
+ try {
+ long rowCount = cmgr.getTableRowCount(stagingTableName);
+ if (rowCount != 0L) {
+ throw new ExportException("The specified staging table ("
+ + stagingTableName + ") is not empty. To force deletion of "
+ + "its data, please retry with --clear-staging-table option.");
+ }
+ } catch (SQLException ex) {
+ throw new ExportException(
+ "Failed to count data rows in staging table: "
+ + stagingTableName, ex);
+ }
+ }
+ }
+
+ try {
+ Job job = new Job(conf);
+
+ // Set the external jar to use for the job.
+ job.getConfiguration().set("mapred.jar", ormJarFile);
+
+ configureInputFormat(job, tableName, tableClassName, null);
+ configureOutputFormat(job, tableName, tableClassName);
+ configureMapper(job, tableName, tableClassName);
+ configureNumTasks(job);
+ cacheJars(job, context.getConnManager());
+ setJob(job);
+ boolean success = runJob(job);
+ if (!success) {
+ throw new ExportException("Export job failed!");
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ unloadJars();
+ }
+
+ // Unstage the data if needed
+ if (stagingEnabled) {
+ // Migrate data from staging table to the output table
+ try {
+ LOG.info("Starting to migrate data from staging table to destination.");
+ cmgr.migrateData(stagingTableName, outputTableName);
+ } catch (SQLException ex) {
+ LOG.error("Failed to move data from staging table ("
+ + stagingTableName + ") to target table ("
+ + outputTableName + ")", ex);
+ throw new ExportException(
+ "Failed to move data from staging table", ex);
+ }
+ }
+ }
+
+ /**
+ * @return true if the input directory contains SequenceFiles.
+ * @deprecated use {@link #getInputFileType()} instead
+ */
+ @Deprecated
+ protected boolean inputIsSequenceFiles() {
+ try {
+ return isSequenceFiles(
+ context.getOptions().getConf(), getInputPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not check file format for export; assuming text");
+ return false;
+ }
+ }
+
+ protected FileType getInputFileType() {
+ try {
+ return getFileType(context.getOptions().getConf(), getInputPath());
+ } catch (IOException ioe) {
+ return FileType.UNKNOWN;
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Insert the emitted keys as records into a database table.
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the
+ * SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class ExportOutputFormat<K extends SqoopRecord, V>
+ extends AsyncSqlOutputFormat<K, V> {
+
+ private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ DBConfiguration dbConf = new DBConfiguration(conf);
+
+ // Sanity check all the configuration values we need.
+ if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+ throw new IOException("Database connection URL is not set.");
+ } else if (null == dbConf.getOutputTableName()) {
+ throw new IOException("Table name is not set for export");
+ } else if (null == dbConf.getOutputFieldNames()
+ && 0 == dbConf.getOutputFieldCount()) {
+ throw new IOException(
+ "Output field names are null and zero output field count set.");
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new ExportRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class ExportRecordWriter extends AsyncSqlRecordWriter<K, V> {
+
+ protected String tableName;
+ protected String [] columnNames; // The columns to insert into.
+ protected int columnCount; // If columnNames is null, tells ## of cols.
+
+ public ExportRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+
+ Configuration conf = getConf();
+
+ DBConfiguration dbConf = new DBConfiguration(conf);
+ tableName = dbConf.getOutputTableName();
+ columnNames = dbConf.getOutputFieldNames();
+ columnCount = dbConf.getOutputFieldCount();
+ }
+
+ /**
+ * @return the name of the table we are inserting into.
+ */
+ protected final String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return the list of columns we are updating.
+ */
+ protected final String [] getColumnNames() {
+ if (null == columnNames) {
+ return null;
+ } else {
+ return Arrays.copyOf(columnNames, columnNames.length);
+ }
+ }
+
+ /**
+ * @return the number of columns we are updating.
+ */
+ protected final int getColumnCount() {
+ return columnCount;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ protected PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ PreparedStatement stmt = null;
+
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ Connection conn = getConnection();
+ synchronized (conn) {
+ stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
+ }
+
+ // Inject the record parameters into the VALUES clauses.
+ int position = 0;
+ for (SqoopRecord record : userRecords) {
+ position += record.write(stmt, position);
+ }
+
+ return stmt;
+ }
+
+ /**
+ * @return an INSERT statement suitable for inserting 'numRows' rows.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + tableName + " ");
+
+ int numSlots;
+ if (this.columnNames != null) {
+ numSlots = this.columnNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : columnNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = this.columnCount; // set if columnNames is null.
+ }
+
+ sb.append("VALUES ");
+
+ // generates the (?, ?, ?...) used for each row.
+ StringBuilder sbRow = new StringBuilder();
+ sbRow.append("(");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sbRow.append(", ");
+ }
+
+ sbRow.append("?");
+ }
+ sbRow.append(")");
+
+ // Now append that numRows times.
+ for (int i = 0; i < numRows; i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+
+ sb.append(sbRow);
+ }
+
+ return sb.toString();
+ }
+ }
+}