You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/03/29 23:57:06 UTC

svn commit: r1307132 - in /sqoop/trunk/src/java/org/apache/sqoop: manager/ConnManager.java mapreduce/AvroImportMapper.java

Author: blee
Date: Thu Mar 29 21:57:06 2012
New Revision: 1307132

URL: http://svn.apache.org/viewvc?rev=1307132&view=rev
Log:
SQOOP-465 BLOB support for Avro import

Modified:
    sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java

Modified: sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java?rev=1307132&r1=1307131&r2=1307132&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java Thu Mar 29 21:57:06 2012
@@ -190,8 +190,10 @@ public abstract class ConnManager {
     case Types.TIME:
     case Types.TIMESTAMP:
       return Type.LONG;
+    case Types.BLOB:
     case Types.BINARY:
     case Types.VARBINARY:
+    case Types.LONGVARBINARY:
       return Type.BYTES;
     default:
       throw new IllegalArgumentException("Cannot convert SQL type "

Modified: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java?rev=1307132&r1=1307131&r2=1307132&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java Thu Mar 29 21:57:06 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Map;
@@ -32,8 +33,10 @@ import org.apache.avro.mapred.AvroWrappe
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import com.cloudera.sqoop.lib.BlobRef;
 import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
 import com.cloudera.sqoop.lib.SqoopRecord;
 import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
 
@@ -47,19 +50,37 @@ public class AvroImportMapper
   private final AvroWrapper<GenericRecord> wrapper =
     new AvroWrapper<GenericRecord>();
   private Schema schema;
+  private LargeObjectLoader lobLoader;
 
   @Override
-  protected void setup(Context context) {
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
     schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+    lobLoader = new LargeObjectLoader(context.getConfiguration(),
+        FileOutputFormat.getWorkOutputPath(context));
   }
 
   @Override
   protected void map(LongWritable key, SqoopRecord val, Context context)
       throws IOException, InterruptedException {
+
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      val.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+
     wrapper.datum(toGenericRecord(val));
     context.write(wrapper, NullWritable.get());
   }
 
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
 
   private GenericRecord toGenericRecord(SqoopRecord val) {
     Map<String, Object> fieldMap = val.getFieldMap();
@@ -88,10 +109,13 @@ public class AvroImportMapper
     } else if (o instanceof BytesWritable) {
       BytesWritable bw = (BytesWritable) o;
       return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+    } else if (o instanceof BlobRef) {
+      BlobRef br = (BlobRef) o;
+      // If blob data is stored in an external .lob file, save the ref file
+      // as Avro bytes. If materialized inline, save blob data as Avro bytes.
+      return br.isExternal() ? br.toString().getBytes() : br.getData();
     } else if (o instanceof ClobRef) {
       throw new UnsupportedOperationException("ClobRef not suported");
-    } else if (o instanceof BlobRef) {
-      throw new UnsupportedOperationException("BlobRef not suported");
     }
     // primitive types (Integer, etc) are left unchanged
     return o;