You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/11 01:58:07 UTC

svn commit: r1156405 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/orm/ test/com/cloudera/sqoop/ test/com/cloudera/sqoop/testutil/

Author: arvind
Date: Wed Aug 10 23:58:07 2011
New Revision: 1156405

URL: http://svn.apache.org/viewvc?rev=1156405&view=rev
Log:
SQOOP-305. Support export from Avro Data Files.

(Tom White via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Wed Aug 10 23:58:07 2011
@@ -91,6 +91,34 @@ public abstract class ConnManager {
   public abstract Map<String, Integer> getColumnTypes(String tableName);
 
   /**
+   * Return an unordered mapping from colname to sqltype for
+   * all columns in a table or query.
+   *
+   * The Integer type id is a constant from java.sql.Types
+   *
+   * @param tableName the name of the table
+   * @param sqlQuery the SQL query to use if tableName is null
+   */
+  public Map<String, Integer> getColumnTypes(String tableName,
+      String sqlQuery) throws IOException {
+    Map<String, Integer> columnTypes;
+    if (null != tableName) {
+      // We're generating a class based on a table import.
+      columnTypes = getColumnTypes(tableName);
+    } else {
+      // This is based on an arbitrary query.
+      String query = sqlQuery;
+      if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
+        throw new IOException("Query [" + query + "] must contain '"
+            + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
+      }
+
+      columnTypes = getColumnTypesForQuery(query);
+    }
+    return columnTypes;
+  }
+
+  /**
    * This method allows various connection managers to indicate if they support
    * staging data for export jobs. The managers that do support this must
    * override this method and return <tt>true</tt>.

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.orm.ClassWriter;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Exports records from an Avro data file.
+ */
+public class AvroExportMapper
+    extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
+              SqoopRecord, NullWritable> {
+
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+
+  private static final String TIME_TYPE = "java.sql.Time";
+
+  private static final String DATE_TYPE = "java.sql.Date";
+
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+
+  static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map";
+
+  private MapWritable columnTypes;
+  private SqoopRecord recordImpl;
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    // Instantiate a copy of the user's class to hold and parse the record.
+    String recordClassName = conf.get(
+        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+          + ") is not set!");
+    }
+
+    try {
+      Class cls = Class.forName(recordClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == recordImpl) {
+      throw new IOException("Could not instantiate object of type "
+          + recordClassName);
+    }
+
+    columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
+        MapWritable.class);
+  }
+
+  @Override
+  protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
+      Context context) throws IOException, InterruptedException {
+    context.write(toSqoopRecord(key.datum()), NullWritable.get());
+  }
+
+  private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
+    Schema avroSchema = record.getSchema();
+    for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
+      String columnName = e.getKey().toString();
+      String columnType = e.getValue().toString();
+      String cleanedCol = ClassWriter.toIdentifier(columnName);
+      Field field = getField(avroSchema, cleanedCol, record);
+      if (field == null) {
+        throw new IOException("Cannot find field " + cleanedCol
+          + " in Avro schema " + avroSchema);
+      } else {
+        Object avroObject = record.get(field.name());
+        Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
+        recordImpl.setField(cleanedCol, fieldVal);
+      }
+    }
+    return recordImpl;
+  }
+
+  private Field getField(Schema avroSchema, String fieldName,
+      GenericRecord record) {
+    for (Field field : avroSchema.getFields()) {
+      if (field.name().equalsIgnoreCase(fieldName)) {
+        return field;
+      }
+    }
+    return null;
+  }
+
+  private Object fromAvro(Object avroObject, Schema fieldSchema,
+      String columnType) {
+    // map from Avro type to Sqoop's Java representation of the SQL type
+    // see SqlManager#toJavaType
+
+    if (avroObject == null) {
+      return null;
+    }
+
+    switch (fieldSchema.getType()) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+      case INT:
+      case FLOAT:
+      case DOUBLE:
+        return avroObject;
+      case LONG:
+        if (columnType.equals(DATE_TYPE)) {
+          return new Date((Long) avroObject);
+        } else if (columnType.equals(TIME_TYPE)) {
+          return new Time((Long) avroObject);
+        } else if (columnType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) avroObject);
+        }
+        return avroObject;
+      case BYTES:
+        ByteBuffer bb = (ByteBuffer) avroObject;
+        BytesWritable bw = new BytesWritable();
+        bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+        return bw;
+      case STRING:
+        if (columnType.equals(BIG_DECIMAL_TYPE)) {
+          return new BigDecimal(avroObject.toString());
+        } else if (columnType.equals(DATE_TYPE)) {
+          return Date.valueOf(avroObject.toString());
+        } else if (columnType.equals(TIME_TYPE)) {
+          return Time.valueOf(avroObject.toString());
+        } else if (columnType.equals(TIMESTAMP_TYPE)) {
+          return Timestamp.valueOf(avroObject.toString());
+        }
+        return avroObject.toString();
+      case ENUM:
+        return ((GenericEnumSymbol) avroObject).toString();
+      case UNION:
+        List<Schema> types = fieldSchema.getTypes();
+        if (types.size() != 2) {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+        Schema s1 = types.get(0);
+        Schema s2 = types.get(1);
+        if (s1.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s2, columnType);
+        } else if (s2.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s1, columnType);
+        } else {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+      case FIXED:
+        return new BytesWritable(((GenericFixed) avroObject).bytes());
+      case RECORD:
+      case ARRAY:
+      case MAP:
+      default:
+        throw new IllegalArgumentException("Cannot convert Avro type "
+            + fieldSchema.getType());
+    }
+  }
+
+}

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java Wed Aug 10 23:58:07 2011
@@ -24,6 +24,7 @@ import com.cloudera.sqoop.lib.SqoopRecor
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -33,6 +34,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 
@@ -84,6 +86,9 @@ public class AvroImportMapper
       return ((Time) o).getTime();
     } else if (o instanceof Timestamp) {
       return ((Timestamp) o).getTime();
+    } else if (o instanceof BytesWritable) {
+      BytesWritable bw = (BytesWritable) o;
+      return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
     } else if (o instanceof ClobRef) {
       throw new UnsupportedOperationException("ClobRef not suported");
     } else if (o instanceof BlobRef) {

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. */
+public class AvroInputFormat<T>
+  extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job)) {
+      if (file.getPath().getName().endsWith(
+          org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+        result.add(file);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    context.setStatus(split.toString());
+    return new AvroRecordReader<T>();
+  }
+
+}
+

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+  extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+  private FileReader<T> reader;
+  private long start;
+  private long end;
+  private AvroWrapper<T> key;
+  private NullWritable value;
+
+  @Override
+  public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration conf = context.getConfiguration();
+    SeekableInput in = new FsInput(split.getPath(), conf);
+    DatumReader<T> datumReader = new GenericDatumReader<T>();
+    this.reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(split.getStart());                    // sync to start
+    this.start = reader.tell();
+    this.end = split.getStart() + split.getLength();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!reader.hasNext() || reader.pastSync(end)) {
+      key = null;
+      value = null;
+      return false;
+    }
+    if (key == null) {
+      key = new AvroWrapper<T>();
+    }
+    if (value == null) {
+      value = NullWritable.get();
+    }
+    key.datum(reader.next(key.datum()));
+    return true;
+  }
+
+  @Override
+  public AvroWrapper<T> getCurrentKey() throws IOException,
+      InterruptedException {
+    return key;
+  }
+
+  @Override
+  public NullWritable getCurrentValue()
+      throws IOException, InterruptedException {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return reader.tell();
+  }
+
+  @Override
+  public void close() throws IOException { reader.close(); }
+}
+

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java Wed Aug 10 23:58:07 2011
@@ -51,6 +51,13 @@ import com.cloudera.sqoop.util.PerfCount
  */
 public class ExportJobBase extends JobBase {
 
+  /**
+   * The (inferred) type of a file or group of files.
+   */
+  public enum FileType {
+    SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+  }
+
   public static final Log LOG = LogFactory.getLog(
       ExportJobBase.class.getName());
 
@@ -89,6 +96,15 @@ public class ExportJobBase extends JobBa
    */
   public static boolean isSequenceFiles(Configuration conf, Path p)
       throws IOException {
+    return getFileType(conf, p) == FileType.SEQUENCE_FILE;
+  }
+
+  /**
+   * @return the type of the file represented by p (or the files in p, if a
+   * directory)
+   */
+  public static FileType getFileType(Configuration conf, Path p)
+      throws IOException {
     FileSystem fs = p.getFileSystem(conf);
 
     try {
@@ -97,14 +113,14 @@ public class ExportJobBase extends JobBa
       if (null == stat) {
         // Couldn't get the item.
         LOG.warn("Input path " + p + " does not exist");
-        return false;
+        return FileType.UNKNOWN;
       }
 
       if (stat.isDir()) {
         FileStatus [] subitems = fs.listStatus(p);
         if (subitems == null || subitems.length == 0) {
           LOG.warn("Input path " + p + " contains no files");
-          return false; // empty dir.
+          return FileType.UNKNOWN; // empty dir.
         }
 
         // Pick a child entry to examine instead.
@@ -125,14 +141,14 @@ public class ExportJobBase extends JobBa
       if (null == stat) {
         LOG.warn("null FileStatus object in isSequenceFiles(); "
             + "assuming false.");
-        return false;
+        return FileType.UNKNOWN;
       }
 
       Path target = stat.getPath();
-      return hasSequenceFileHeader(target, conf);
+      return fromMagicNumber(target, conf);
     } catch (FileNotFoundException fnfe) {
       LOG.warn("Input path " + p + " does not exist");
-      return false; // doesn't exist!
+      return FileType.UNKNOWN; // doesn't exist!
     }
   }
 
@@ -140,9 +156,9 @@ public class ExportJobBase extends JobBa
    * @param file a file to test.
    * @return true if 'file' refers to a SequenceFile.
    */
-  private static boolean hasSequenceFileHeader(Path file, Configuration conf) {
-    // Test target's header to see if it contains magic numbers indicating it's
-    // a SequenceFile.
+  private static FileType fromMagicNumber(Path file, Configuration conf) {
+    // Test target's header to see if it contains magic numbers indicating its
+    // file type
     byte [] header = new byte[3];
     FSDataInputStream is = null;
     try {
@@ -150,9 +166,9 @@ public class ExportJobBase extends JobBa
       is = fs.open(file);
       is.readFully(header);
     } catch (IOException ioe) {
-      // Error reading header or EOF; assume not a SequenceFile.
-      LOG.warn("IOException checking SequenceFile header: " + ioe);
-      return false;
+      // Error reading header or EOF; assume unknown
+      LOG.warn("IOException checking input file header: " + ioe);
+      return FileType.UNKNOWN;
     } finally {
       try {
         if (null != is) {
@@ -164,8 +180,13 @@ public class ExportJobBase extends JobBa
       }
     }
 
-    // Return true (isSequenceFile) iff the magic number sticks.
-    return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q';
+    if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
+      return FileType.SEQUENCE_FILE;
+    }
+    if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
+      return FileType.AVRO_DATA_FILE;
+    }
+    return FileType.UNKNOWN;
   }
 
   /**
@@ -363,7 +384,9 @@ public class ExportJobBase extends JobBa
 
   /**
    * @return true if the input directory contains SequenceFiles.
+   * @deprecated use {@link #getInputFileType()} instead
    */
+  @Deprecated
   protected boolean inputIsSequenceFiles() {
     try {
       return isSequenceFiles(
@@ -373,4 +396,12 @@ public class ExportJobBase extends JobBa
       return false;
     }
   }
+
+  protected FileType getInputFileType() {
+    try {
+      return getFileType(context.getOptions().getConf(), getInputPath());
+    } catch (IOException ioe) {
+      return FileType.UNKNOWN;
+    }
+  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java Wed Aug 10 23:58:07 2011
@@ -18,26 +18,32 @@
 
 package com.cloudera.sqoop.mapreduce;
 
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import com.cloudera.sqoop.orm.ClassWriter;
+
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
-
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ExportJobContext;
 
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
  */
 public class JdbcExportJob extends ExportJobBase {
 
+  private FileType fileType;
+
   public static final Log LOG = LogFactory.getLog(
       JdbcExportJob.class.getName());
 
@@ -53,11 +59,50 @@ public class JdbcExportJob extends Expor
   }
 
   @Override
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+
+    fileType = getInputFileType();
+
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+    if (fileType == FileType.AVRO_DATA_FILE) {
+      LOG.debug("Configuring for Avro export");
+      ConnManager connManager = context.getConnManager();
+      Map<String, Integer> columnTypeInts =
+        connManager.getColumnTypes(tableName, options.getSqlQuery());
+      MapWritable columnTypes = new MapWritable();
+      for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+        Text columnName = new Text(e.getKey());
+        Text columnText = new Text(connManager.toJavaType(e.getValue()));
+        columnTypes.put(columnName, columnText);
+      }
+      DefaultStringifier.store(job.getConfiguration(), columnTypes,
+          AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+    }
+
+  }
+
+  @Override
+  protected Class<? extends InputFormat> getInputFormatClass()
+      throws ClassNotFoundException {
+    if (fileType == FileType.AVRO_DATA_FILE) {
+      return AvroInputFormat.class;
+    }
+    return super.getInputFormatClass();
+  }
+
+  @Override
   protected Class<? extends Mapper> getMapperClass() {
-    if (inputIsSequenceFiles()) {
-      return SequenceFileExportMapper.class;
-    } else {
-      return TextExportMapper.class;
+    switch (fileType) {
+      case SEQUENCE_FILE:
+        return SequenceFileExportMapper.class;
+      case AVRO_DATA_FILE:
+        return AvroExportMapper.class;
+      case UNKNOWN:
+      default:
+        return TextExportMapper.class;
     }
   }
 
@@ -92,7 +137,5 @@ public class JdbcExportJob extends Expor
     }
   }
 
-
-
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java Wed Aug 10 23:58:07 2011
@@ -1179,21 +1179,7 @@ public class ClassWriter {
   }
 
   protected Map<String, Integer> getColumnTypes() throws IOException {
-    Map<String, Integer> columnTypes;
-    if (null != tableName) {
-      // We're generating a class based on a table import.
-      columnTypes = connManager.getColumnTypes(tableName);
-    } else {
-      // This is based on an arbitrary query.
-      String query = this.options.getSqlQuery();
-      if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
-        throw new IOException("Query [" + query + "] must contain '"
-            + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
-      }
-
-      columnTypes = connManager.getColumnTypesForQuery(query);
-    }
-    return columnTypes;
+    return connManager.getColumnTypes(tableName, options.getSqlQuery());
   }
 
   /**

Added: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java (added)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test that we can export Avro Data Files from HDFS into databases.
+ */
+public class TestAvroExport extends ExportJobTestCase {
+
+  /**
+   * @return an argv for the CodeGenTool to use when creating tables to export.
+   */
+  protected String [] getCodeGenArgv(String... extraArgs) {
+    List<String> codeGenArgv = new ArrayList<String>();
+
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        codeGenArgv.add(arg);
+      }
+    }
+
+    codeGenArgv.add("--table");
+    codeGenArgv.add(getTableName());
+    codeGenArgv.add("--connect");
+    codeGenArgv.add(getConnectString());
+
+    return codeGenArgv.toArray(new String[0]);
+  }
+
+  /** When generating data for export tests, each column is generated
+      according to a ColumnGenerator. Methods exist for determining
+      what to put into Avro objects in the files to export, as well
+      as what the object representation of the column as returned by
+      the database should look like.
+    */
+  public interface ColumnGenerator {
+    /** For a row with id rowNum, what should we write into that
+        Avro record to export?
+      */
+    Object getExportValue(int rowNum);
+
+    /** Return the Avro schema for the field. */
+    Schema getColumnAvroSchema();
+
+    /** For a row with id rowNum, what should the database return
+        for the given column's value?
+      */
+    Object getVerifyValue(int rowNum);
+
+    /** Return the column type to put in the CREATE TABLE statement. */
+    String getColumnType();
+  }
+
+  private ColumnGenerator colGenerator(final Object exportValue,
+      final Schema schema, final Object verifyValue,
+      final String columnType) {
+    return new ColumnGenerator() {
+      @Override
+      public Object getVerifyValue(int rowNum) {
+        return verifyValue;
+      }
+      @Override
+      public Object getExportValue(int rowNum) {
+        return exportValue;
+      }
+      @Override
+      public String getColumnType() {
+        return columnType;
+      }
+      @Override
+      public Schema getColumnAvroSchema() {
+        return schema;
+      }
+    };
+  }
+
+  /**
+   * Create a data file that gets exported to the db.
+   * @param fileNum the number of the file (for multi-file export)
+   * @param numRecords how many records to write to the file.
+   */
+  protected void createAvroFile(int fileNum, int numRecords,
+      ColumnGenerator... extraCols) throws IOException {
+
+    String ext = ".avro";
+    Path tablePath = getTablePath();
+    Path filePath = new Path(tablePath, "part" + fileNum + ext);
+
+    Configuration conf = new Configuration();
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    FileSystem fs = FileSystem.get(conf);
+    fs.mkdirs(tablePath);
+    OutputStream os = fs.create(filePath);
+
+    Schema schema = buildAvroSchema(extraCols);
+    DatumWriter<GenericRecord> datumWriter =
+      new GenericDatumWriter<GenericRecord>();
+    DataFileWriter<GenericRecord> dataFileWriter =
+      new DataFileWriter<GenericRecord>(datumWriter);
+    dataFileWriter.create(schema, os);
+
+    for (int i = 0; i < numRecords; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put("id", i);
+      record.put("msg", getMsgPrefix() + i);
+      addExtraColumns(record, i, extraCols);
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    os.close();
+  }
+
+  private Schema buildAvroSchema(ColumnGenerator... extraCols) {
+    List<Field> fields = new ArrayList<Field>();
+    fields.add(buildAvroField("id", Schema.Type.INT));
+    fields.add(buildAvroField("msg", Schema.Type.STRING));
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getColumnAvroSchema() != null) {
+        fields.add(buildAvroField(forIdx(colNum++),
+            gen.getColumnAvroSchema()));
+      }
+    }
+    Schema schema = Schema.createRecord("myschema", null, null, false);
+    schema.setFields(fields);
+    return schema;
+  }
+
+  private void addExtraColumns(GenericRecord record, int rowNum,
+      ColumnGenerator[] extraCols) {
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getColumnAvroSchema() != null) {
+        record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+      }
+    }
+  }
+
+  private Field buildAvroField(String name, Schema.Type type) {
+    return new Field(name, Schema.create(type), null, null);
+  }
+
+  private Field buildAvroField(String name, Schema schema) {
+    return new Field(name, schema, null, null);
+  }
+
+  /** Return the column name for a column index.
+   *  Each table contains two columns named 'id' and 'msg', and then an
+   *  arbitrary number of additional columns defined by ColumnGenerators.
+   *  These columns are referenced by idx 0, 1, 2...
+   *  @param idx the index of the ColumnGenerator in the array passed to
+   *   createTable().
+   *  @return the name of the column
+   */
+  protected String forIdx(int idx) {
+    return "col" + idx;
+  }
+
+  /**
+   * Return a SQL statement that drops a table, if it exists.
+   * @param tableName the table to drop.
+   * @return the SQL statement to drop that table.
+   */
+  protected String getDropTableStatement(String tableName) {
+    return "DROP TABLE " + tableName + " IF EXISTS";
+  }
+
+  /** Create the table definition to export to, removing any prior table.
+      By specifying ColumnGenerator arguments, you can add extra columns
+      to the table of arbitrary type.
+   */
+  private void createTable(ColumnGenerator... extraColumns)
+      throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement statement = conn.prepareStatement(
+        getDropTableStatement(getTableName()),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(getTableName());
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    int colNum = 0;
+    for (ColumnGenerator gen : extraColumns) {
+      if (gen.getColumnType() != null) {
+        sb.append(", " + forIdx(colNum++) + " " + gen.getColumnType());
+      }
+    }
+    sb.append(")");
+
+    statement = conn.prepareStatement(sb.toString(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  /** Verify that on a given row, a column has a given value.
+   * @param id the id column specifying the row to test.
+   */
+  private void assertColValForRowId(int id, String colName, Object expectedVal)
+      throws SQLException {
+    Connection conn = getConnection();
+    LOG.info("Verifying column " + colName + " has value " + expectedVal);
+
+    PreparedStatement statement = conn.prepareStatement(
+        "SELECT " + colName + " FROM " + getTableName() + " WHERE id = " + id,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    Object actualVal = null;
+    try {
+      ResultSet rs = statement.executeQuery();
+      try {
+        rs.next();
+        actualVal = rs.getObject(1);
+      } finally {
+        rs.close();
+      }
+    } finally {
+      statement.close();
+    }
+
+    if (expectedVal != null && expectedVal instanceof byte[]) {
+      assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+    } else {
+      assertEquals("Got unexpected column value", expectedVal, actualVal);
+    }
+  }
+
+  /** Verify that for the max and min values of the 'id' column, the values
+      for a given column meet the expected values.
+   */
+  protected void assertColMinAndMax(String colName, ColumnGenerator generator)
+      throws SQLException {
+    Connection conn = getConnection();
+    int minId = getMinRowId(conn);
+    int maxId = getMaxRowId(conn);
+
+    LOG.info("Checking min/max for column " + colName + " with type "
+        + generator.getColumnType());
+
+    Object expectedMin = generator.getVerifyValue(minId);
+    Object expectedMax = generator.getVerifyValue(maxId);
+
+    assertColValForRowId(minId, colName, expectedMin);
+    assertColValForRowId(maxId, colName, expectedMax);
+  }
+
+  public void testSupportedAvroTypes() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1 * 10;
+
+    byte[] b = new byte[] { (byte) 1, (byte) 2 };
+    Schema fixed = Schema.createFixed("myfixed", null, null, 2);
+    Schema enumeration = Schema.createEnum("myenum", null, null,
+        Lists.newArrayList("a", "b"));
+
+    ColumnGenerator[] gens = new ColumnGenerator[] {
+      colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
+      colGenerator(100, Schema.create(Schema.Type.INT), 100, "INTEGER"),
+      colGenerator(200L, Schema.create(Schema.Type.LONG), 200L, "BIGINT"),
+      // HSQLDB maps REAL to double, not float:
+      colGenerator(1.0f, Schema.create(Schema.Type.FLOAT), 1.0d, "REAL"),
+      colGenerator(2.0d, Schema.create(Schema.Type.DOUBLE), 2.0d, "DOUBLE"),
+      colGenerator("s", Schema.create(Schema.Type.STRING), "s", "VARCHAR(8)"),
+      colGenerator(ByteBuffer.wrap(b), Schema.create(Schema.Type.BYTES),
+          b, "VARBINARY(8)"),
+      colGenerator(new GenericData.Fixed(fixed, b), fixed,
+          b, "BINARY(2)"),
+      colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
+          "a", "VARCHAR(8)"),
+    };
+    createAvroFile(0, TOTAL_RECORDS, gens);
+    createTable(gens);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+    for (int i = 0; i < gens.length; i++) {
+      assertColMinAndMax(forIdx(i), gens[i]);
+    }
+  }
+
+  public void testNullableField() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1 * 10;
+
+    List<Schema> childSchemas = new ArrayList<Schema>();
+    childSchemas.add(Schema.create(Schema.Type.STRING));
+    childSchemas.add(Schema.create(Schema.Type.NULL));
+    Schema schema =  Schema.createUnion(childSchemas);
+    ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)");
+    ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)");
+    createAvroFile(0, TOTAL_RECORDS, gen0, gen1);
+    createTable(gen0, gen1);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+    assertColMinAndMax(forIdx(0), gen0);
+    assertColMinAndMax(forIdx(1), gen1);
+  }
+
+  public void testAvroRecordsNotSupported() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    Schema schema =  Schema.createRecord("nestedrecord", null, null, false);
+    schema.setFields(Lists.newArrayList(buildAvroField("myint",
+        Schema.Type.INT)));
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("myint", 100);
+    // DB type is not used so can be anything:
+    ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)");
+    createAvroFile(0, TOTAL_RECORDS,  gen);
+    createTable(gen);
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+      fail("Avro records can not be exported.");
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
+
+  public void testMissingDatabaseFields() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    // null column type means don't create a database column
+    // the Avro value will not be exported
+    ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT),
+        null, null);
+    createAvroFile(0, TOTAL_RECORDS, gen);
+    createTable(gen);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+  }
+
+  public void testMissingAvroFields()  throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    // null Avro schema means don't create an Avro field
+    ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)");
+    createAvroFile(0, TOTAL_RECORDS, gen);
+    createTable(gen);
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+      fail("Missing Avro field.");
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
+
+}

Added: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java (added)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ExportTool;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Tests importing a database table as an Avro Data File then back to the
+ * database.
+ */
+public class TestAvroImportExportRoundtrip extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory
+      .getLog(TestAvroImportExportRoundtrip.class.getName());
+
+  public void testRoundtrip() throws IOException, SQLException {
+    String[] argv = {};
+
+    runImport(getOutputArgv(true));
+    deleteTableData();
+    runExport(getExportArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+
+    checkFirstColumnSum();
+  }
+
+  /**
+   * Create the argv to pass to Sqoop.
+   *
+   * @return the argv as an array of strings.
+   */
+  protected String[] getOutputArgv(boolean includeHadoopFlags) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      CommonArgs.addHadoopFlags(args);
+    }
+
+    args.add("--table");
+    args.add(HsqldbTestServer.getTableName());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--split-by");
+    args.add("INTFIELD1");
+    args.add("--as-avrodatafile");
+
+    return args.toArray(new String[0]);
+  }
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @param includeHadoopFlags if true, then include -D various.settings=values
+   * @param rowsPerStmt number of rows to export in a single INSERT statement.
+   * @param statementsPerTx ## of statements to use in a transaction.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getExportArgv(boolean includeHadoopFlags,
+      int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    // Any additional Hadoop flags (-D foo=bar) are prepended.
+    if (null != additionalArgv) {
+      boolean prevIsFlag = false;
+      for (String arg : additionalArgv) {
+        if (arg.equals("-D")) {
+          args.add(arg);
+          prevIsFlag = true;
+        } else if (prevIsFlag) {
+          args.add(arg);
+          prevIsFlag = false;
+        }
+      }
+    }
+
+    // The sqoop-specific additional args are then added.
+    if (null != additionalArgv) {
+      boolean prevIsFlag = false;
+      for (String arg : additionalArgv) {
+        if (arg.equals("-D")) {
+          prevIsFlag = true;
+          continue;
+        } else if (prevIsFlag) {
+          prevIsFlag = false;
+          continue;
+        } else {
+          // normal argument.
+          args.add(arg);
+        }
+      }
+    }
+
+    args.add("--table");
+    args.add(getTableName());
+    args.add("--export-dir");
+    args.add(getTablePath().toString());
+    args.add("--connect");
+    args.add(getConnectString());
+    args.add("-m");
+    args.add("1");
+
+    LOG.debug("args:");
+    for (String a : args) {
+      LOG.debug("  " + a);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  // this test just uses the two int table.
+  protected String getTableName() {
+    return HsqldbTestServer.getTableName();
+  }
+
+  private void deleteTableData() throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement statement = conn.prepareStatement(
+        "DELETE FROM " + getTableName(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  /**
+   * Run a MapReduce-based export (using the argv provided to control
+   * execution).
+   * @return the generated jar filename
+   */
+  protected List<String> runExport(String [] argv) throws IOException {
+    // run the tool through the normal entry-point.
+    int ret;
+    List<String> generatedJars = null;
+    try {
+      ExportTool exporter = new ExportTool();
+      Sqoop sqoop = new Sqoop(exporter);
+      ret = Sqoop.runSqoop(sqoop, argv);
+      generatedJars = exporter.getGeneratedJarFiles();
+    } catch (Exception e) {
+      LOG.error("Got exception running Sqoop: "
+          + StringUtils.stringifyException(e));
+      ret = 1;
+    }
+
+    // expect a successful return.
+    if (0 != ret) {
+      throw new IOException("Failure during job; return status " + ret);
+    }
+
+    return generatedJars;
+  }
+
+  private void checkFirstColumnSum() throws SQLException {
+    Connection conn = getConnection();
+
+    PreparedStatement statement = conn.prepareStatement(
+        "SELECT SUM(INTFIELD1) FROM " + getTableName(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    int actualVal = 0;
+    try {
+      ResultSet rs = statement.executeQuery();
+      try {
+        rs.next();
+        actualVal = rs.getInt(1);
+      } finally {
+        rs.close();
+      }
+    } finally {
+      statement.close();
+    }
+
+    assertEquals("First column column sum", HsqldbTestServer.getFirstColSum(),
+        actualVal);
+  }
+}

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java Wed Aug 10 23:58:07 2011
@@ -51,28 +51,11 @@ import com.cloudera.sqoop.testutil.Expor
 import com.cloudera.sqoop.tool.CodeGenTool;
 import com.cloudera.sqoop.util.ClassLoaderStack;
 
-import org.junit.Before;
-
 /**
  * Test that we can export data from HDFS into databases.
  */
 public class TestExport extends ExportJobTestCase {
 
-  @Before
-  public void setUp() {
-    // start the server
-    super.setUp();
-
-    if (useHsqldbTestServer()) {
-      // throw away any existing data that might be in the database.
-      try {
-        this.getTestServer().dropExistingSchema();
-      } catch (SQLException sqlE) {
-        fail(sqlE.toString());
-      }
-    }
-  }
-
   /**
    * @return an argv for the CodeGenTool to use when creating tables to export.
    */

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java Wed Aug 10 23:58:07 2011
@@ -43,21 +43,6 @@ import org.junit.Before;
  */
 public class TestExportUpdate extends ExportJobTestCase {
 
-  @Before
-  public void setUp() {
-    // start the server
-    super.setUp();
-
-    if (useHsqldbTestServer()) {
-      // throw away any existing data that might be in the database.
-      try {
-        this.getTestServer().dropExistingSchema();
-      } catch (SQLException sqlE) {
-        fail(sqlE.toString());
-      }
-    }
-  }
-
   @Override
   protected String getTablePrefix() {
     return "UPDATE_TABLE_";

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java Wed Aug 10 23:58:07 2011
@@ -41,6 +41,7 @@ import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.tool.ImportTool;
+import com.google.common.collect.ObjectArrays;
 
 import junit.framework.TestCase;
 
@@ -132,6 +133,19 @@ public abstract class BaseSqoopTestCase 
   protected ConnManager getManager() {
     return manager;
   }
+  
+
+  /**
+   * @return a connection to the database under test.
+   */
+  protected Connection getConnection() {
+    try {
+      return getTestServer().getConnection();
+    } catch (SQLException sqlE) {
+      LOG.error("Could not get connection to test server: " + sqlE);
+      return null;
+    }
+  }
 
   // instance variables populated during setUp, used during tests
   private HsqldbTestServer testServer;
@@ -439,4 +453,23 @@ public abstract class BaseSqoopTestCase 
       getManager().release();
     }
   }
+
+  /**
+   * Create a new string array with 'moreEntries' appended to the 'entries'
+   * array.
+   * @param entries initial entries in the array
+   * @param moreEntries variable-length additional entries.
+   * @return an array containing entries with all of moreEntries appended.
+   */
+  protected String [] newStrArray(String [] entries, String... moreEntries)  {
+    if (null == moreEntries) {
+      return entries;
+    }
+
+    if (null == entries) {
+      entries = new String[0];
+    }
+
+    return ObjectArrays.concat(entries, moreEntries, String.class);
+  }
 }

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java Wed Aug 10 23:58:07 2011
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
 
 import com.cloudera.sqoop.Sqoop;
 import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
@@ -42,6 +43,21 @@ public abstract class ExportJobTestCase 
 
   public static final Log LOG = LogFactory.getLog(
       ExportJobTestCase.class.getName());
+  
+  @Before
+  public void setUp() {
+    // start the server
+    super.setUp();
+
+    if (useHsqldbTestServer()) {
+      // throw away any existing data that might be in the database.
+      try {
+        this.getTestServer().dropExistingSchema();
+      } catch (SQLException sqlE) {
+        fail(sqlE.toString());
+      }
+    }
+  }
 
   protected String getTablePrefix() {
     return "EXPORT_TABLE_";
@@ -57,18 +73,6 @@ public abstract class ExportJobTestCase 
   }
 
   /**
-   * @return a connection to the database under test.
-   */
-  protected Connection getConnection() {
-    try {
-      return getTestServer().getConnection();
-    } catch (SQLException sqlE) {
-      LOG.error("Could not get connection to test server: " + sqlE);
-      return null;
-    }
-  }
-
-  /**
    * Create the argv to pass to Sqoop.
    * @param includeHadoopFlags if true, then include -D various.settings=values
    * @param rowsPerStmt number of rows to export in a single INSERT statement.