You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/03/29 23:57:06 UTC
svn commit: r1307132 - in /sqoop/trunk/src/java/org/apache/sqoop:
manager/ConnManager.java mapreduce/AvroImportMapper.java
Author: blee
Date: Thu Mar 29 21:57:06 2012
New Revision: 1307132
URL: http://svn.apache.org/viewvc?rev=1307132&view=rev
Log:
SQOOP-465 BLOB support for Avro import
Modified:
sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
Modified: sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java?rev=1307132&r1=1307131&r2=1307132&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java Thu Mar 29 21:57:06 2012
@@ -190,8 +190,10 @@ public abstract class ConnManager {
case Types.TIME:
case Types.TIMESTAMP:
return Type.LONG;
+ case Types.BLOB:
case Types.BINARY:
case Types.VARBINARY:
+ case Types.LONGVARBINARY:
return Type.BYTES;
default:
throw new IllegalArgumentException("Cannot convert SQL type "
Modified: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java?rev=1307132&r1=1307131&r2=1307132&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java Thu Mar 29 21:57:06 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
+import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Map;
@@ -32,8 +33,10 @@ import org.apache.avro.mapred.AvroWrappe
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
@@ -47,19 +50,37 @@ public class AvroImportMapper
private final AvroWrapper<GenericRecord> wrapper =
new AvroWrapper<GenericRecord>();
private Schema schema;
+ private LargeObjectLoader lobLoader;
@Override
- protected void setup(Context context) {
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+ lobLoader = new LargeObjectLoader(context.getConfiguration(),
+ FileOutputFormat.getWorkOutputPath(context));
}
@Override
protected void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
+
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ val.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+
wrapper.datum(toGenericRecord(val));
context.write(wrapper, NullWritable.get());
}
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
private GenericRecord toGenericRecord(SqoopRecord val) {
Map<String, Object> fieldMap = val.getFieldMap();
@@ -88,10 +109,13 @@ public class AvroImportMapper
} else if (o instanceof BytesWritable) {
BytesWritable bw = (BytesWritable) o;
return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+ } else if (o instanceof BlobRef) {
+ BlobRef br = (BlobRef) o;
+ // If blob data is stored in an external .lob file, save the ref file
+ // as Avro bytes. If materialized inline, save blob data as Avro bytes.
+ return br.isExternal() ? br.toString().getBytes() : br.getData();
} else if (o instanceof ClobRef) {
throw new UnsupportedOperationException("ClobRef not suported");
- } else if (o instanceof BlobRef) {
- throw new UnsupportedOperationException("BlobRef not suported");
}
// primitive types (Integer, etc) are left unchanged
return o;