You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/04/25 23:13:16 UTC

svn commit: r1096606 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java test/org/apache/pig/test/TestPruneColumn.java

Author: daijy
Date: Mon Apr 25 21:13:16 2011
New Revision: 1096606

URL: http://svn.apache.org/viewvc?rev=1096606&view=rev
Log:
PIG-1981: LoadPushDown.pushProjection should pass alias in addition to position

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 25 21:13:16 2011
@@ -44,6 +44,8 @@ PIG-1876: Typed map for Pig (daijy)
 
 IMPROVEMENTS
 
+PIG-1981: LoadPushDown.pushProjection should pass alias in addition to position (daijy)
+
 PIG-2006: Regression: NPE when Pig processes an empty script file, fix test case (xuefu)
 
 PIG-2006: Regression: NPE when Pig processes an empty script file (xuefu)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Mon Apr 25 21:13:16 2011
@@ -94,6 +94,7 @@ public class ColumnPruneVisitor extends 
             if( required.first != null && required.first.containsKey(i) ) {
                 requiredField = new RequiredField();
                 requiredField.setIndex(i);
+                requiredField.setAlias(s.getField(i).alias);
                 requiredField.setType(s.getField(i).type);
                 List<RequiredField> subFields = new ArrayList<RequiredField>();
                 for( String key : required.first.get(i) ) {
@@ -106,6 +107,7 @@ public class ColumnPruneVisitor extends 
             if( required.second != null && required.second.contains(i) ) {
                 requiredField = new RequiredField();
                 requiredField.setIndex(i);
+                requiredField.setAlias(s.getField(i).alias);
                 requiredField.setType(s.getField(i).type);      
                 requiredFields.add(requiredField);
             }

Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Mon Apr 25 21:13:16 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,15 +33,30 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.pig.ExecType;
 import org.apache.pig.FilterFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
 import org.apache.pig.PigServer;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.LoadPushDown.RequiredFieldResponse;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
 import org.junit.After;
 import org.junit.Before;
@@ -1980,4 +1996,77 @@ public class TestPruneColumn extends Tes
         reader1.close();
         reader2.close();
     }
+    
+    static public class PruneColumnEvalFunc extends LoadFunc implements LoadPushDown {
+        String[] aliases;
+        String signature;
+        public PruneColumnEvalFunc() {}
+        @Override
+        public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
+            aliases = new String[requiredFieldList.getFields().size()];
+            for (int i=0; i<requiredFieldList.getFields().size(); i++) {
+                RequiredField fs = requiredFieldList.getFields().get(i);
+                aliases[i] = fs.getAlias();
+            }
+            try {
+                UDFContext.getUDFContext().getUDFProperties(this.getClass()).setProperty(signature, ObjectSerializer.serialize(aliases));
+            } catch (IOException e) {
+                throw new FrontendException(e);
+            }
+            return new RequiredFieldResponse(true);
+        }
+
+        @Override
+        public List<OperatorSet> getFeatures() {
+            return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+        }
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+            FileInputFormat.setInputPaths(job, location);
+        }
+
+        @Override
+        public InputFormat getInputFormat() throws IOException {
+            return new PigTextInputFormat();
+        }
+
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split)
+                throws IOException {
+        }
+        
+        @Override
+        public void setUDFContextSignature(String signature) {
+            this.signature = signature;
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            if (aliases==null) {
+                aliases = (String[])ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty(signature));
+                Tuple t = TupleFactory.getInstance().newTuple();
+                for (String s : aliases)
+                    t.append(s);
+                return t;
+            }
+            return null;
+        }
+    }
+    
+    public void testAliasInRequiredFieldList() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+                + PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate a1, a2;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals("a1"));
+        assertTrue(t.get(1).equals("a2"));
+        
+        assertFalse(iter.hasNext());
+    }
 }