You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/04/25 23:13:16 UTC
svn commit: r1096606 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
test/org/apache/pig/test/TestPruneColumn.java
Author: daijy
Date: Mon Apr 25 21:13:16 2011
New Revision: 1096606
URL: http://svn.apache.org/viewvc?rev=1096606&view=rev
Log:
PIG-1981: LoadPushDown.pushProjection should pass alias in addition to position
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 25 21:13:16 2011
@@ -44,6 +44,8 @@ PIG-1876: Typed map for Pig (daijy)
IMPROVEMENTS
+PIG-1981: LoadPushDown.pushProjection should pass alias in addition to position (daijy)
+
PIG-2006: Regression: NPE when Pig processes an empty script file, fix test case (xuefu)
PIG-2006: Regression: NPE when Pig processes an empty script file (xuefu)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Mon Apr 25 21:13:16 2011
@@ -94,6 +94,7 @@ public class ColumnPruneVisitor extends
if( required.first != null && required.first.containsKey(i) ) {
requiredField = new RequiredField();
requiredField.setIndex(i);
+ requiredField.setAlias(s.getField(i).alias);
requiredField.setType(s.getField(i).type);
List<RequiredField> subFields = new ArrayList<RequiredField>();
for( String key : required.first.get(i) ) {
@@ -106,6 +107,7 @@ public class ColumnPruneVisitor extends
if( required.second != null && required.second.contains(i) ) {
requiredField = new RequiredField();
requiredField.setIndex(i);
+ requiredField.setAlias(s.getField(i).alias);
requiredField.setType(s.getField(i).type);
requiredFields.add(requiredField);
}
Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1096606&r1=1096605&r2=1096606&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Mon Apr 25 21:13:16 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -32,15 +33,30 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.pig.ExecType;
import org.apache.pig.FilterFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
import org.apache.pig.PigServer;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.LoadPushDown.RequiredFieldResponse;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
import org.junit.After;
import org.junit.Before;
@@ -1980,4 +1996,77 @@ public class TestPruneColumn extends Tes
reader1.close();
reader2.close();
}
+
+ static public class PruneColumnEvalFunc extends LoadFunc implements LoadPushDown {
+ String[] aliases;
+ String signature;
+ public PruneColumnEvalFunc() {}
+ @Override
+ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
+ aliases = new String[requiredFieldList.getFields().size()];
+ for (int i=0; i<requiredFieldList.getFields().size(); i++) {
+ RequiredField fs = requiredFieldList.getFields().get(i);
+ aliases[i] = fs.getAlias();
+ }
+ try {
+ UDFContext.getUDFContext().getUDFProperties(this.getClass()).setProperty(signature, ObjectSerializer.serialize(aliases));
+ } catch (IOException e) {
+ throw new FrontendException(e);
+ }
+ return new RequiredFieldResponse(true);
+ }
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new PigTextInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ if (aliases==null) {
+ aliases = (String[])ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty(signature));
+ Tuple t = TupleFactory.getInstance().newTuple();
+ for (String s : aliases)
+ t.append(s);
+ return t;
+ }
+ return null;
+ }
+ }
+
+ public void testAliasInRequiredFieldList() throws Exception{
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+ + PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
+ pigServer.registerQuery("B = foreach A generate a1, a2;");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.size()==2);
+ assertTrue(t.get(0).equals("a1"));
+ assertTrue(t.get(1).equals("a2"));
+
+ assertFalse(iter.hasNext());
+ }
}