You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2017/01/18 23:50:16 UTC

svn commit: r1779401 - in /pig/branches/branch-0.16: CHANGES.txt src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java test/org/apache/pig/builtin/TestAvroStorage.java test/org/apache/pig/builtin/avro/code/pig/group_test.pig

Author: daijy
Date: Wed Jan 18 23:50:15 2017
New Revision: 1779401

URL: http://svn.apache.org/viewvc?rev=1779401&view=rev
Log:
PIG-5108: AvroStorage on Tez with exception on nested records

Added:
    pig/branches/branch-0.16/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
    pig/branches/branch-0.16/test/org/apache/pig/builtin/TestAvroStorage.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1779401&r1=1779400&r2=1779401&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 18 23:50:15 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
+
 PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
 
 PIG-4918: Pig on Tez cannot switch pig.temp.dir to another fs (daijy)

Modified: pig/branches/branch-0.16/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1779401&r1=1779400&r2=1779401&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed Jan 18 23:50:15 2017
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -49,6 +50,7 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@ public final class AvroTupleWrapper <T e
   }
 
   @Override
-  public void write(final DataOutput o) throws IOException {
-    throw new IOException(
-        this.getClass().toString() + ".write called, but not implemented yet");
+  public void write(DataOutput out) throws IOException {
+      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+      t.write(out);
   }
 
   @SuppressWarnings("rawtypes")

Modified: pig/branches/branch-0.16/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1779401&r1=1779400&r2=1779401&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/builtin/TestAvroStorage.java Wed Jan 18 23:50:15 2017
@@ -709,6 +709,19 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testGroupWithRepeatedSubRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      testAvroStorage(true, basedir + "code/pig/group_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test
     public void testLoadDirectory() throws Exception {
       final String input = basedir + "data/avro/uncompressed/testdirectory";
       final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";

Added: pig/branches/branch-0.16/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/builtin/avro/code/pig/group_test.pig?rev=1779401&view=auto
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/builtin/avro/code/pig/group_test.pig (added)
+++ pig/branches/branch-0.16/test/org/apache/pig/builtin/avro/code/pig/group_test.pig Wed Jan 18 23:50:15 2017
@@ -0,0 +1,5 @@
+in = LOAD '$INFILE' USING AvroStorage();
+grouped = GROUP in BY (value1.thing);
+flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int));
+RMF $OUTFILE;
+STORE flattened INTO '$OUTFILE' USING AvroStorage();