You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/11 01:58:07 UTC
svn commit: r1156405 - in /incubator/sqoop/trunk/src:
java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/
java/com/cloudera/sqoop/orm/ test/com/cloudera/sqoop/
test/com/cloudera/sqoop/testutil/
Author: arvind
Date: Wed Aug 10 23:58:07 2011
New Revision: 1156405
URL: http://svn.apache.org/viewvc?rev=1156405&view=rev
Log:
SQOOP-305. Support export from Avro Data Files.
(Tom White via Arvind Prabhakar)
Added:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Wed Aug 10 23:58:07 2011
@@ -91,6 +91,34 @@ public abstract class ConnManager {
public abstract Map<String, Integer> getColumnTypes(String tableName);
/**
+ * Return an unordered mapping from colname to sqltype for
+ * all columns in a table or query.
+ *
+ * The Integer type id is a constant from java.sql.Types
+ *
+ * @param tableName the name of the table
+ * @param sqlQuery the SQL query to use if tableName is null
+ */
+ public Map<String, Integer> getColumnTypes(String tableName,
+ String sqlQuery) throws IOException {
+ Map<String, Integer> columnTypes;
+ if (null != tableName) {
+ // We're generating a class based on a table import.
+ columnTypes = getColumnTypes(tableName);
+ } else {
+ // This is based on an arbitrary query.
+ String query = sqlQuery;
+ if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
+ throw new IOException("Query [" + query + "] must contain '"
+ + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
+ }
+
+ columnTypes = getColumnTypesForQuery(query);
+ }
+ return columnTypes;
+ }
+
+ /**
* This method allows various connection managers to indicate if they support
* staging data for export jobs. The managers that do support this must
* override this method and return <tt>true</tt>.
Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.orm.ClassWriter;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Exports records from an Avro data file.
+ */
+public class AvroExportMapper
+ extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
+ SqoopRecord, NullWritable> {
+
+ private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+
+ private static final String TIME_TYPE = "java.sql.Time";
+
+ private static final String DATE_TYPE = "java.sql.Date";
+
+ private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+
+ static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map";
+
+ private MapWritable columnTypes;
+ private SqoopRecord recordImpl;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ // Instantiate a copy of the user's class to hold and parse the record.
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == recordImpl) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+
+ columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
+ MapWritable.class);
+ }
+
+ @Override
+ protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
+ Context context) throws IOException, InterruptedException {
+ context.write(toSqoopRecord(key.datum()), NullWritable.get());
+ }
+
+ private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
+ Schema avroSchema = record.getSchema();
+ for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
+ String columnName = e.getKey().toString();
+ String columnType = e.getValue().toString();
+ String cleanedCol = ClassWriter.toIdentifier(columnName);
+ Field field = getField(avroSchema, cleanedCol, record);
+ if (field == null) {
+ throw new IOException("Cannot find field " + cleanedCol
+ + " in Avro schema " + avroSchema);
+ } else {
+ Object avroObject = record.get(field.name());
+ Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
+ recordImpl.setField(cleanedCol, fieldVal);
+ }
+ }
+ return recordImpl;
+ }
+
+ private Field getField(Schema avroSchema, String fieldName,
+ GenericRecord record) {
+ for (Field field : avroSchema.getFields()) {
+ if (field.name().equalsIgnoreCase(fieldName)) {
+ return field;
+ }
+ }
+ return null;
+ }
+
+ private Object fromAvro(Object avroObject, Schema fieldSchema,
+ String columnType) {
+ // map from Avro type to Sqoop's Java representation of the SQL type
+ // see SqlManager#toJavaType
+
+ if (avroObject == null) {
+ return null;
+ }
+
+ switch (fieldSchema.getType()) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ case INT:
+ case FLOAT:
+ case DOUBLE:
+ return avroObject;
+ case LONG:
+ if (columnType.equals(DATE_TYPE)) {
+ return new Date((Long) avroObject);
+ } else if (columnType.equals(TIME_TYPE)) {
+ return new Time((Long) avroObject);
+ } else if (columnType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp((Long) avroObject);
+ }
+ return avroObject;
+ case BYTES:
+ ByteBuffer bb = (ByteBuffer) avroObject;
+ BytesWritable bw = new BytesWritable();
+ bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+ return bw;
+ case STRING:
+ if (columnType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(avroObject.toString());
+ } else if (columnType.equals(DATE_TYPE)) {
+ return Date.valueOf(avroObject.toString());
+ } else if (columnType.equals(TIME_TYPE)) {
+ return Time.valueOf(avroObject.toString());
+ } else if (columnType.equals(TIMESTAMP_TYPE)) {
+ return Timestamp.valueOf(avroObject.toString());
+ }
+ return avroObject.toString();
+ case ENUM:
+ return ((GenericEnumSymbol) avroObject).toString();
+ case UNION:
+ List<Schema> types = fieldSchema.getTypes();
+ if (types.size() != 2) {
+ throw new IllegalArgumentException("Only support union with null");
+ }
+ Schema s1 = types.get(0);
+ Schema s2 = types.get(1);
+ if (s1.getType() == Schema.Type.NULL) {
+ return fromAvro(avroObject, s2, columnType);
+ } else if (s2.getType() == Schema.Type.NULL) {
+ return fromAvro(avroObject, s1, columnType);
+ } else {
+ throw new IllegalArgumentException("Only support union with null");
+ }
+ case FIXED:
+ return new BytesWritable(((GenericFixed) avroObject).bytes());
+ case RECORD:
+ case ARRAY:
+ case MAP:
+ default:
+ throw new IllegalArgumentException("Cannot convert Avro type "
+ + fieldSchema.getType());
+ }
+ }
+
+}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java Wed Aug 10 23:58:07 2011
@@ -24,6 +24,7 @@ import com.cloudera.sqoop.lib.SqoopRecor
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@@ -33,6 +34,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -84,6 +86,9 @@ public class AvroImportMapper
return ((Time) o).getTime();
} else if (o instanceof Timestamp) {
return ((Timestamp) o).getTime();
+ } else if (o instanceof BytesWritable) {
+ BytesWritable bw = (BytesWritable) o;
+ return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
} else if (o instanceof ClobRef) {
throw new UnsupportedOperationException("ClobRef not suported");
} else if (o instanceof BlobRef) {
Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. */
+public class AvroInputFormat<T>
+ extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job)) {
+ if (file.getPath().getName().endsWith(
+ org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+ result.add(file);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ context.setStatus(split.toString());
+ return new AvroRecordReader<T>();
+ }
+
+}
+
Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+ extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+ private FileReader<T> reader;
+ private long start;
+ private long end;
+ private AvroWrapper<T> key;
+ private NullWritable value;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration conf = context.getConfiguration();
+ SeekableInput in = new FsInput(split.getPath(), conf);
+ DatumReader<T> datumReader = new GenericDatumReader<T>();
+ this.reader = DataFileReader.openReader(in, datumReader);
+ reader.sync(split.getStart()); // sync to start
+ this.start = reader.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!reader.hasNext() || reader.pastSync(end)) {
+ key = null;
+ value = null;
+ return false;
+ }
+ if (key == null) {
+ key = new AvroWrapper<T>();
+ }
+ if (value == null) {
+ value = NullWritable.get();
+ }
+ key.datum(reader.next(key.datum()));
+ return true;
+ }
+
+ @Override
+ public AvroWrapper<T> getCurrentKey() throws IOException,
+ InterruptedException {
+ return key;
+ }
+
+ @Override
+ public NullWritable getCurrentValue()
+ throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return reader.tell();
+ }
+
+ @Override
+ public void close() throws IOException { reader.close(); }
+}
+
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java Wed Aug 10 23:58:07 2011
@@ -51,6 +51,13 @@ import com.cloudera.sqoop.util.PerfCount
*/
public class ExportJobBase extends JobBase {
+ /**
+ * The (inferred) type of a file or group of files.
+ */
+ public enum FileType {
+ SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+ }
+
public static final Log LOG = LogFactory.getLog(
ExportJobBase.class.getName());
@@ -89,6 +96,15 @@ public class ExportJobBase extends JobBa
*/
public static boolean isSequenceFiles(Configuration conf, Path p)
throws IOException {
+ return getFileType(conf, p) == FileType.SEQUENCE_FILE;
+ }
+
+ /**
+ * @return the type of the file represented by p (or the files in p, if a
+ * directory)
+ */
+ public static FileType getFileType(Configuration conf, Path p)
+ throws IOException {
FileSystem fs = p.getFileSystem(conf);
try {
@@ -97,14 +113,14 @@ public class ExportJobBase extends JobBa
if (null == stat) {
// Couldn't get the item.
LOG.warn("Input path " + p + " does not exist");
- return false;
+ return FileType.UNKNOWN;
}
if (stat.isDir()) {
FileStatus [] subitems = fs.listStatus(p);
if (subitems == null || subitems.length == 0) {
LOG.warn("Input path " + p + " contains no files");
- return false; // empty dir.
+ return FileType.UNKNOWN; // empty dir.
}
// Pick a child entry to examine instead.
@@ -125,14 +141,14 @@ public class ExportJobBase extends JobBa
if (null == stat) {
LOG.warn("null FileStatus object in isSequenceFiles(); "
+ "assuming false.");
- return false;
+ return FileType.UNKNOWN;
}
Path target = stat.getPath();
- return hasSequenceFileHeader(target, conf);
+ return fromMagicNumber(target, conf);
} catch (FileNotFoundException fnfe) {
LOG.warn("Input path " + p + " does not exist");
- return false; // doesn't exist!
+ return FileType.UNKNOWN; // doesn't exist!
}
}
@@ -140,9 +156,9 @@ public class ExportJobBase extends JobBa
* @param file a file to test.
* @return true if 'file' refers to a SequenceFile.
*/
- private static boolean hasSequenceFileHeader(Path file, Configuration conf) {
- // Test target's header to see if it contains magic numbers indicating it's
- // a SequenceFile.
+ private static FileType fromMagicNumber(Path file, Configuration conf) {
+ // Test target's header to see if it contains magic numbers indicating its
+ // file type
byte [] header = new byte[3];
FSDataInputStream is = null;
try {
@@ -150,9 +166,9 @@ public class ExportJobBase extends JobBa
is = fs.open(file);
is.readFully(header);
} catch (IOException ioe) {
- // Error reading header or EOF; assume not a SequenceFile.
- LOG.warn("IOException checking SequenceFile header: " + ioe);
- return false;
+ // Error reading header or EOF; assume unknown
+ LOG.warn("IOException checking input file header: " + ioe);
+ return FileType.UNKNOWN;
} finally {
try {
if (null != is) {
@@ -164,8 +180,13 @@ public class ExportJobBase extends JobBa
}
}
- // Return true (isSequenceFile) iff the magic number sticks.
- return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q';
+ if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
+ return FileType.SEQUENCE_FILE;
+ }
+ if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
+ return FileType.AVRO_DATA_FILE;
+ }
+ return FileType.UNKNOWN;
}
/**
@@ -363,7 +384,9 @@ public class ExportJobBase extends JobBa
/**
* @return true if the input directory contains SequenceFiles.
+ * @deprecated use {@link #getInputFileType()} instead
*/
+ @Deprecated
protected boolean inputIsSequenceFiles() {
try {
return isSequenceFiles(
@@ -373,4 +396,12 @@ public class ExportJobBase extends JobBa
return false;
}
}
+
+ protected FileType getInputFileType() {
+ try {
+ return getFileType(context.getOptions().getConf(), getInputPath());
+ } catch (IOException ioe) {
+ return FileType.UNKNOWN;
+ }
+ }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java Wed Aug 10 23:58:07 2011
@@ -18,26 +18,32 @@
package com.cloudera.sqoop.mapreduce;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import com.cloudera.sqoop.orm.ClassWriter;
+
import java.io.IOException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
-
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ExportJobContext;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
*/
public class JdbcExportJob extends ExportJobBase {
+ private FileType fileType;
+
public static final Log LOG = LogFactory.getLog(
JdbcExportJob.class.getName());
@@ -53,11 +59,50 @@ public class JdbcExportJob extends Expor
}
@Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+
+ fileType = getInputFileType();
+
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+ if (fileType == FileType.AVRO_DATA_FILE) {
+ LOG.debug("Configuring for Avro export");
+ ConnManager connManager = context.getConnManager();
+ Map<String, Integer> columnTypeInts =
+ connManager.getColumnTypes(tableName, options.getSqlQuery());
+ MapWritable columnTypes = new MapWritable();
+ for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+ Text columnName = new Text(e.getKey());
+ Text columnText = new Text(connManager.toJavaType(e.getValue()));
+ columnTypes.put(columnName, columnText);
+ }
+ DefaultStringifier.store(job.getConfiguration(), columnTypes,
+ AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+ }
+
+ }
+
+ @Override
+ protected Class<? extends InputFormat> getInputFormatClass()
+ throws ClassNotFoundException {
+ if (fileType == FileType.AVRO_DATA_FILE) {
+ return AvroInputFormat.class;
+ }
+ return super.getInputFormatClass();
+ }
+
+ @Override
protected Class<? extends Mapper> getMapperClass() {
- if (inputIsSequenceFiles()) {
- return SequenceFileExportMapper.class;
- } else {
- return TextExportMapper.class;
+ switch (fileType) {
+ case SEQUENCE_FILE:
+ return SequenceFileExportMapper.class;
+ case AVRO_DATA_FILE:
+ return AvroExportMapper.class;
+ case UNKNOWN:
+ default:
+ return TextExportMapper.class;
}
}
@@ -92,7 +137,5 @@ public class JdbcExportJob extends Expor
}
}
-
-
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/ClassWriter.java Wed Aug 10 23:58:07 2011
@@ -1179,21 +1179,7 @@ public class ClassWriter {
}
protected Map<String, Integer> getColumnTypes() throws IOException {
- Map<String, Integer> columnTypes;
- if (null != tableName) {
- // We're generating a class based on a table import.
- columnTypes = connManager.getColumnTypes(tableName);
- } else {
- // This is based on an arbitrary query.
- String query = this.options.getSqlQuery();
- if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
- throw new IOException("Query [" + query + "] must contain '"
- + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
- }
-
- columnTypes = connManager.getColumnTypesForQuery(query);
- }
- return columnTypes;
+ return connManager.getColumnTypes(tableName, options.getSqlQuery());
}
/**
Added: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java (added)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroExport.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test that we can export Avro Data Files from HDFS into databases.
+ */
+public class TestAvroExport extends ExportJobTestCase {
+
+ /**
+ * @return an argv for the CodeGenTool to use when creating tables to export.
+ */
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ List<String> codeGenArgv = new ArrayList<String>();
+
+ if (null != extraArgs) {
+ for (String arg : extraArgs) {
+ codeGenArgv.add(arg);
+ }
+ }
+
+ codeGenArgv.add("--table");
+ codeGenArgv.add(getTableName());
+ codeGenArgv.add("--connect");
+ codeGenArgv.add(getConnectString());
+
+ return codeGenArgv.toArray(new String[0]);
+ }
+
+ /** When generating data for export tests, each column is generated
+ according to a ColumnGenerator. Methods exist for determining
+ what to put into Avro objects in the files to export, as well
+ as what the object representation of the column as returned by
+ the database should look like.
+ */
+ public interface ColumnGenerator {
+ /** For a row with id rowNum, what should we write into that
+ Avro record to export?
+ */
+ Object getExportValue(int rowNum);
+
+ /** Return the Avro schema for the field. */
+ Schema getColumnAvroSchema();
+
+ /** For a row with id rowNum, what should the database return
+ for the given column's value?
+ */
+ Object getVerifyValue(int rowNum);
+
+ /** Return the column type to put in the CREATE TABLE statement. */
+ String getColumnType();
+ }
+
+ private ColumnGenerator colGenerator(final Object exportValue,
+ final Schema schema, final Object verifyValue,
+ final String columnType) {
+ return new ColumnGenerator() {
+ @Override
+ public Object getVerifyValue(int rowNum) {
+ return verifyValue;
+ }
+ @Override
+ public Object getExportValue(int rowNum) {
+ return exportValue;
+ }
+ @Override
+ public String getColumnType() {
+ return columnType;
+ }
+ @Override
+ public Schema getColumnAvroSchema() {
+ return schema;
+ }
+ };
+ }
+
+ /**
+ * Create a data file that gets exported to the db.
+ * @param fileNum the number of the file (for multi-file export)
+ * @param numRecords how many records to write to the file.
+ */
+ protected void createAvroFile(int fileNum, int numRecords,
+ ColumnGenerator... extraCols) throws IOException {
+
+ String ext = ".avro";
+ Path tablePath = getTablePath();
+ Path filePath = new Path(tablePath, "part" + fileNum + ext);
+
+ Configuration conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(tablePath);
+ OutputStream os = fs.create(filePath);
+
+ Schema schema = buildAvroSchema(extraCols);
+ DatumWriter<GenericRecord> datumWriter =
+ new GenericDatumWriter<GenericRecord>();
+ DataFileWriter<GenericRecord> dataFileWriter =
+ new DataFileWriter<GenericRecord>(datumWriter);
+ dataFileWriter.create(schema, os);
+
+ for (int i = 0; i < numRecords; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", i);
+ record.put("msg", getMsgPrefix() + i);
+ addExtraColumns(record, i, extraCols);
+ dataFileWriter.append(record);
+ }
+
+ dataFileWriter.close();
+ os.close();
+ }
+
+ private Schema buildAvroSchema(ColumnGenerator... extraCols) {
+ List<Field> fields = new ArrayList<Field>();
+ fields.add(buildAvroField("id", Schema.Type.INT));
+ fields.add(buildAvroField("msg", Schema.Type.STRING));
+ int colNum = 0;
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnAvroSchema() != null) {
+ fields.add(buildAvroField(forIdx(colNum++),
+ gen.getColumnAvroSchema()));
+ }
+ }
+ Schema schema = Schema.createRecord("myschema", null, null, false);
+ schema.setFields(fields);
+ return schema;
+ }
+
+ private void addExtraColumns(GenericRecord record, int rowNum,
+ ColumnGenerator[] extraCols) {
+ int colNum = 0;
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnAvroSchema() != null) {
+ record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+ }
+ }
+ }
+
+ private Field buildAvroField(String name, Schema.Type type) {
+ return new Field(name, Schema.create(type), null, null);
+ }
+
+ private Field buildAvroField(String name, Schema schema) {
+ return new Field(name, schema, null, null);
+ }
+
+ /** Return the column name for a column index.
+ * Each table contains two columns named 'id' and 'msg', and then an
+ * arbitrary number of additional columns defined by ColumnGenerators.
+ * These columns are referenced by idx 0, 1, 2...
+ * @param idx the index of the ColumnGenerator in the array passed to
+ * createTable().
+ * @return the name of the column
+ */
+ protected String forIdx(int idx) {
+ return "col" + idx;
+ }
+
+ /**
+ * Return a SQL statement that drops a table, if it exists.
+ * @param tableName the table to drop.
+ * @return the SQL statement to drop that table.
+ */
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE " + tableName + " IF EXISTS";
+ }
+
+ /** Create the table definition to export to, removing any prior table.
+ By specifying ColumnGenerator arguments, you can add extra columns
+ to the table of arbitrary type.
+ */
+ private void createTable(ColumnGenerator... extraColumns)
+ throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement statement = conn.prepareStatement(
+ getDropTableStatement(getTableName()),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(getTableName());
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ int colNum = 0;
+ for (ColumnGenerator gen : extraColumns) {
+ if (gen.getColumnType() != null) {
+ sb.append(", " + forIdx(colNum++) + " " + gen.getColumnType());
+ }
+ }
+ sb.append(")");
+
+ statement = conn.prepareStatement(sb.toString(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
+ /** Verify that on a given row, a column has a given value.
+ * @param id the id column specifying the row to test.
+ */
+ private void assertColValForRowId(int id, String colName, Object expectedVal)
+ throws SQLException {
+ Connection conn = getConnection();
+ LOG.info("Verifying column " + colName + " has value " + expectedVal);
+
+ PreparedStatement statement = conn.prepareStatement(
+ "SELECT " + colName + " FROM " + getTableName() + " WHERE id = " + id,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ Object actualVal = null;
+ try {
+ ResultSet rs = statement.executeQuery();
+ try {
+ rs.next();
+ actualVal = rs.getObject(1);
+ } finally {
+ rs.close();
+ }
+ } finally {
+ statement.close();
+ }
+
+ if (expectedVal != null && expectedVal instanceof byte[]) {
+ assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+ } else {
+ assertEquals("Got unexpected column value", expectedVal, actualVal);
+ }
+ }
+
+ /** Verify that for the max and min values of the 'id' column, the values
+ for a given column meet the expected values.
+ */
+ protected void assertColMinAndMax(String colName, ColumnGenerator generator)
+ throws SQLException {
+ Connection conn = getConnection();
+ int minId = getMinRowId(conn);
+ int maxId = getMaxRowId(conn);
+
+ LOG.info("Checking min/max for column " + colName + " with type "
+ + generator.getColumnType());
+
+ Object expectedMin = generator.getVerifyValue(minId);
+ Object expectedMax = generator.getVerifyValue(maxId);
+
+ assertColValForRowId(minId, colName, expectedMin);
+ assertColValForRowId(maxId, colName, expectedMax);
+ }
+
+ public void testSupportedAvroTypes() throws IOException, SQLException {
+ String[] argv = {};
+ final int TOTAL_RECORDS = 1 * 10;
+
+ byte[] b = new byte[] { (byte) 1, (byte) 2 };
+ Schema fixed = Schema.createFixed("myfixed", null, null, 2);
+ Schema enumeration = Schema.createEnum("myenum", null, null,
+ Lists.newArrayList("a", "b"));
+
+ ColumnGenerator[] gens = new ColumnGenerator[] {
+ colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
+ colGenerator(100, Schema.create(Schema.Type.INT), 100, "INTEGER"),
+ colGenerator(200L, Schema.create(Schema.Type.LONG), 200L, "BIGINT"),
+ // HSQLDB maps REAL to double, not float:
+ colGenerator(1.0f, Schema.create(Schema.Type.FLOAT), 1.0d, "REAL"),
+ colGenerator(2.0d, Schema.create(Schema.Type.DOUBLE), 2.0d, "DOUBLE"),
+ colGenerator("s", Schema.create(Schema.Type.STRING), "s", "VARCHAR(8)"),
+ colGenerator(ByteBuffer.wrap(b), Schema.create(Schema.Type.BYTES),
+ b, "VARBINARY(8)"),
+ colGenerator(new GenericData.Fixed(fixed, b), fixed,
+ b, "BINARY(2)"),
+ colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
+ "a", "VARCHAR(8)"),
+ };
+ createAvroFile(0, TOTAL_RECORDS, gens);
+ createTable(gens);
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ verifyExport(TOTAL_RECORDS);
+ for (int i = 0; i < gens.length; i++) {
+ assertColMinAndMax(forIdx(i), gens[i]);
+ }
+ }
+
+ public void testNullableField() throws IOException, SQLException {
+ String[] argv = {};
+ final int TOTAL_RECORDS = 1 * 10;
+
+ List<Schema> childSchemas = new ArrayList<Schema>();
+ childSchemas.add(Schema.create(Schema.Type.STRING));
+ childSchemas.add(Schema.create(Schema.Type.NULL));
+ Schema schema = Schema.createUnion(childSchemas);
+ ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)");
+ ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)");
+ createAvroFile(0, TOTAL_RECORDS, gen0, gen1);
+ createTable(gen0, gen1);
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ verifyExport(TOTAL_RECORDS);
+ assertColMinAndMax(forIdx(0), gen0);
+ assertColMinAndMax(forIdx(1), gen1);
+ }
+
+ public void testAvroRecordsNotSupported() throws IOException, SQLException {
+ String[] argv = {};
+ final int TOTAL_RECORDS = 1;
+
+ Schema schema = Schema.createRecord("nestedrecord", null, null, false);
+ schema.setFields(Lists.newArrayList(buildAvroField("myint",
+ Schema.Type.INT)));
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("myint", 100);
+ // DB type is not used so can be anything:
+ ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)");
+ createAvroFile(0, TOTAL_RECORDS, gen);
+ createTable(gen);
+ try {
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ fail("Avro records can not be exported.");
+ } catch (Exception e) {
+ // expected
+ assertTrue(true);
+ }
+ }
+
+ public void testMissingDatabaseFields() throws IOException, SQLException {
+ String[] argv = {};
+ final int TOTAL_RECORDS = 1;
+
+ // null column type means don't create a database column
+ // the Avro value will not be exported
+ ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT),
+ null, null);
+ createAvroFile(0, TOTAL_RECORDS, gen);
+ createTable(gen);
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ verifyExport(TOTAL_RECORDS);
+ }
+
+ public void testMissingAvroFields() throws IOException, SQLException {
+ String[] argv = {};
+ final int TOTAL_RECORDS = 1;
+
+ // null Avro schema means don't create an Avro field
+ ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)");
+ createAvroFile(0, TOTAL_RECORDS, gen);
+ createTable(gen);
+ try {
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ fail("Missing Avro field.");
+ } catch (Exception e) {
+ // expected
+ assertTrue(true);
+ }
+ }
+
+}
Added: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java?rev=1156405&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java (added)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java Wed Aug 10 23:58:07 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ExportTool;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Tests importing a database table as an Avro Data File then back to the
+ * database.
+ */
+public class TestAvroImportExportRoundtrip extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(TestAvroImportExportRoundtrip.class.getName());
+
+ public void testRoundtrip() throws IOException, SQLException {
+ String[] argv = {};
+
+ runImport(getOutputArgv(true));
+ deleteTableData();
+ runExport(getExportArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+
+ checkFirstColumnSum();
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
+ *
+ * @return the argv as an array of strings.
+ */
+ protected String[] getOutputArgv(boolean includeHadoopFlags) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("--table");
+ args.add(HsqldbTestServer.getTableName());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--split-by");
+ args.add("INTFIELD1");
+ args.add("--as-avrodatafile");
+
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @param includeHadoopFlags if true, then include -D various.settings=values
+ * @param rowsPerStmt number of rows to export in a single INSERT statement.
+ * @param statementsPerTx ## of statements to use in a transaction.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getExportArgv(boolean includeHadoopFlags,
+ int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ // Any additional Hadoop flags (-D foo=bar) are prepended.
+ if (null != additionalArgv) {
+ boolean prevIsFlag = false;
+ for (String arg : additionalArgv) {
+ if (arg.equals("-D")) {
+ args.add(arg);
+ prevIsFlag = true;
+ } else if (prevIsFlag) {
+ args.add(arg);
+ prevIsFlag = false;
+ }
+ }
+ }
+
+ // The sqoop-specific additional args are then added.
+ if (null != additionalArgv) {
+ boolean prevIsFlag = false;
+ for (String arg : additionalArgv) {
+ if (arg.equals("-D")) {
+ prevIsFlag = true;
+ continue;
+ } else if (prevIsFlag) {
+ prevIsFlag = false;
+ continue;
+ } else {
+ // normal argument.
+ args.add(arg);
+ }
+ }
+ }
+
+ args.add("--table");
+ args.add(getTableName());
+ args.add("--export-dir");
+ args.add(getTablePath().toString());
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("-m");
+ args.add("1");
+
+ LOG.debug("args:");
+ for (String a : args) {
+ LOG.debug(" " + a);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ // this test just uses the two int table.
+ protected String getTableName() {
+ return HsqldbTestServer.getTableName();
+ }
+
+ private void deleteTableData() throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement statement = conn.prepareStatement(
+ "DELETE FROM " + getTableName(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
+ /**
+ * Run a MapReduce-based export (using the argv provided to control
+ * execution).
+ * @return the generated jar filename
+ */
+ protected List<String> runExport(String [] argv) throws IOException {
+ // run the tool through the normal entry-point.
+ int ret;
+ List<String> generatedJars = null;
+ try {
+ ExportTool exporter = new ExportTool();
+ Sqoop sqoop = new Sqoop(exporter);
+ ret = Sqoop.runSqoop(sqoop, argv);
+ generatedJars = exporter.getGeneratedJarFiles();
+ } catch (Exception e) {
+ LOG.error("Got exception running Sqoop: "
+ + StringUtils.stringifyException(e));
+ ret = 1;
+ }
+
+ // expect a successful return.
+ if (0 != ret) {
+ throw new IOException("Failure during job; return status " + ret);
+ }
+
+ return generatedJars;
+ }
+
+ private void checkFirstColumnSum() throws SQLException {
+ Connection conn = getConnection();
+
+ PreparedStatement statement = conn.prepareStatement(
+ "SELECT SUM(INTFIELD1) FROM " + getTableName(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ int actualVal = 0;
+ try {
+ ResultSet rs = statement.executeQuery();
+ try {
+ rs.next();
+ actualVal = rs.getInt(1);
+ } finally {
+ rs.close();
+ }
+ } finally {
+ statement.close();
+ }
+
+ assertEquals("First column column sum", HsqldbTestServer.getFirstColSum(),
+ actualVal);
+ }
+}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExport.java Wed Aug 10 23:58:07 2011
@@ -51,28 +51,11 @@ import com.cloudera.sqoop.testutil.Expor
import com.cloudera.sqoop.tool.CodeGenTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
-import org.junit.Before;
-
/**
* Test that we can export data from HDFS into databases.
*/
public class TestExport extends ExportJobTestCase {
- @Before
- public void setUp() {
- // start the server
- super.setUp();
-
- if (useHsqldbTestServer()) {
- // throw away any existing data that might be in the database.
- try {
- this.getTestServer().dropExistingSchema();
- } catch (SQLException sqlE) {
- fail(sqlE.toString());
- }
- }
- }
-
/**
* @return an argv for the CodeGenTool to use when creating tables to export.
*/
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java Wed Aug 10 23:58:07 2011
@@ -43,21 +43,6 @@ import org.junit.Before;
*/
public class TestExportUpdate extends ExportJobTestCase {
- @Before
- public void setUp() {
- // start the server
- super.setUp();
-
- if (useHsqldbTestServer()) {
- // throw away any existing data that might be in the database.
- try {
- this.getTestServer().dropExistingSchema();
- } catch (SQLException sqlE) {
- fail(sqlE.toString());
- }
- }
- }
-
@Override
protected String getTablePrefix() {
return "UPDATE_TABLE_";
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java Wed Aug 10 23:58:07 2011
@@ -41,6 +41,7 @@ import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.tool.ImportTool;
+import com.google.common.collect.ObjectArrays;
import junit.framework.TestCase;
@@ -132,6 +133,19 @@ public abstract class BaseSqoopTestCase
protected ConnManager getManager() {
return manager;
}
+
+
+ /**
+ * @return a connection to the database under test.
+ */
+ protected Connection getConnection() {
+ try {
+ return getTestServer().getConnection();
+ } catch (SQLException sqlE) {
+ LOG.error("Could not get connection to test server: " + sqlE);
+ return null;
+ }
+ }
// instance variables populated during setUp, used during tests
private HsqldbTestServer testServer;
@@ -439,4 +453,23 @@ public abstract class BaseSqoopTestCase
getManager().release();
}
}
+
+ /**
+ * Create a new string array with 'moreEntries' appended to the 'entries'
+ * array.
+ * @param entries initial entries in the array
+ * @param moreEntries variable-length additional entries.
+ * @return an array containing entries with all of moreEntries appended.
+ */
+ protected String [] newStrArray(String [] entries, String... moreEntries) {
+ if (null == moreEntries) {
+ return entries;
+ }
+
+ if (null == entries) {
+ entries = new String[0];
+ }
+
+ return ObjectArrays.concat(entries, moreEntries, String.class);
+ }
}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java?rev=1156405&r1=1156404&r2=1156405&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java Wed Aug 10 23:58:07 2011
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
@@ -42,6 +43,21 @@ public abstract class ExportJobTestCase
public static final Log LOG = LogFactory.getLog(
ExportJobTestCase.class.getName());
+
+ @Before
+ public void setUp() {
+ // start the server
+ super.setUp();
+
+ if (useHsqldbTestServer()) {
+ // throw away any existing data that might be in the database.
+ try {
+ this.getTestServer().dropExistingSchema();
+ } catch (SQLException sqlE) {
+ fail(sqlE.toString());
+ }
+ }
+ }
protected String getTablePrefix() {
return "EXPORT_TABLE_";
@@ -57,18 +73,6 @@ public abstract class ExportJobTestCase
}
/**
- * @return a connection to the database under test.
- */
- protected Connection getConnection() {
- try {
- return getTestServer().getConnection();
- } catch (SQLException sqlE) {
- LOG.error("Could not get connection to test server: " + sqlE);
- return null;
- }
- }
-
- /**
* Create the argv to pass to Sqoop.
* @param includeHadoopFlags if true, then include -D various.settings=values
* @param rowsPerStmt number of rows to export in a single INSERT statement.