You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ki...@apache.org on 2011/08/05 03:01:30 UTC

svn commit: r1154059 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java test/com/cloudera/sqoop/TestAvroImport.java

Author: kimballa
Date: Fri Aug  5 01:01:30 2011
New Revision: 1154059

URL: http://svn.apache.org/viewvc?rev=1154059&view=rev
Log:
SQOOP-308. Generated Avro Schema cannot handle nullable fields.


Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java?rev=1154059&r1=1154058&r2=1154059&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java Fri Aug  5 01:01:30 2011
@@ -18,9 +18,6 @@
 
 package com.cloudera.sqoop.orm;
 
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.manager.ConnManager;
-
 import java.io.IOException;
 import java.sql.Types;
 import java.util.ArrayList;
@@ -31,6 +28,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+
 /**
  * Creates an Avro schema to represent a table from a database.
  */
@@ -106,7 +106,12 @@ public class AvroSchemaGenerator {
   }
 
   public Schema toAvroSchema(int sqlType) {
-    return Schema.create(toAvroType(sqlType));
+    // All types are assumed nullabl;e make a union of the "true" type for
+    // a column and NULL.
+    List<Schema> childSchemas = new ArrayList<Schema>();
+    childSchemas.add(Schema.create(toAvroType(sqlType)));
+    childSchemas.add(Schema.create(Schema.Type.NULL));
+    return Schema.createUnion(childSchemas);
   }
 
 }

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java?rev=1154059&r1=1154058&r2=1154059&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java Fri Aug  5 01:01:30 2011
@@ -18,12 +18,10 @@
 
 package com.cloudera.sqoop;
 
-import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.HsqldbTestServer;
-import com.cloudera.sqoop.testutil.ImportJobTestCase;
-
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -39,6 +37,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
 /**
  * Tests --as-avrodatafile.
  */
@@ -89,16 +92,67 @@ public class TestAvroImport extends Impo
     assertEquals(2, fields.size());
 
     assertEquals("INTFIELD1", fields.get(0).name());
-    assertEquals(Schema.Type.INT, fields.get(0).schema().getType());
+    assertEquals(Schema.Type.UNION, fields.get(0).schema().getType());
+    assertEquals(Schema.Type.INT, fields.get(0).schema().getTypes().get(0).getType());
+    assertEquals(Schema.Type.NULL, fields.get(0).schema().getTypes().get(1).getType());
 
     assertEquals("INTFIELD2", fields.get(1).name());
-    assertEquals(Schema.Type.INT, fields.get(1).schema().getType());
+    assertEquals(Schema.Type.UNION, fields.get(1).schema().getType());
+    assertEquals(Schema.Type.INT, fields.get(1).schema().getTypes().get(0).getType());
+    assertEquals(Schema.Type.NULL, fields.get(1).schema().getTypes().get(1).getType());
 
     GenericRecord record1 = reader.next();
     assertEquals(1, record1.get("INTFIELD1"));
     assertEquals(8, record1.get("INTFIELD2"));
   }
 
+  public void testNullableAvroImport() throws IOException, SQLException {
+    addNullRecord(); // Add a pair of NULL values to twointtable.
+    runImport(getOutputArgv(true));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    boolean foundNullRecord = false;
+
+    // Iterate thru the records in the output file til we find one that
+    // matches (NULL, NULL).
+    for (GenericRecord record : reader) {
+      LOG.debug("Input record: " + record);
+      if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) {
+        LOG.debug("Got null record");
+        foundNullRecord = true;
+      }
+    }
+
+    assertTrue(foundNullRecord);
+  }
+
+  /**
+   * Add a record to the TWOINTTABLE that contains (NULL, NULL).
+   *
+   * @throws SQLException if there's a problem doing the INSERT statement.
+   */
+  private void addNullRecord() throws SQLException {
+    Connection connection = null;
+    Statement st = null;
+    try {
+      connection = this.getManager().getConnection();
+      st = connection.createStatement();
+      st.executeUpdate("INSERT INTO " + getTableName()
+          + " VALUES(NULL, NULL)");
+
+      connection.commit();
+    } finally {
+      if (null != st) {
+        st.close();
+      }
+
+      if (null != connection) {
+        connection.close();
+      }
+    }
+  }
+
   private DataFileReader<GenericRecord> read(Path filename) throws IOException {
     Configuration conf = new Configuration();
     if (!BaseSqoopTestCase.isOnPhysicalCluster()) {