You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/12/08 20:54:25 UTC
svn commit: r724462 - in /hadoop/pig/branches/types: ./
src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/
test/org/apache/pig/test/utils/
Author: olga
Date: Mon Dec 8 11:54:25 2008
New Revision: 724462
URL: http://svn.apache.org/viewvc?rev=724462&view=rev
Log:
PIG-546: FilterFunc calls empty constructor when it should be calling parameterized constructor
Added:
hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Dec 8 11:54:25 2008
@@ -325,3 +325,6 @@
PIG-538: support for null constants (pradeepk via olgan)
PIG-385: more null handling (pradeepl via olgan)
+
+ PIG-546: FilterFunc calls empty constructor when it should be calling
+ parameterized constructor
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Dec 8 11:54:25 2008
@@ -54,6 +54,9 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.EvalFunc;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.impl.plan.VisitorException;
@@ -615,6 +618,33 @@
}
}
+class FunctionType {
+ public static final byte UNKNOWNFUNC = 0;
+ public static final byte EVALFUNC = 2;
+ public static final byte COMPARISONFUNC = 4;
+ public static final byte LOADFUNC = 8;
+ public static final byte STOREFUNC = 16;
+
+ public static void tryCasting(Object func, byte funcType) throws Exception {
+ switch(funcType) {
+ case FunctionType.EVALFUNC:
+ EvalFunc evalFunc = (EvalFunc) func;
+ break;
+ case FunctionType.COMPARISONFUNC:
+ ComparisonFunc comparisonFunc = (ComparisonFunc) func;
+ break;
+ case FunctionType.LOADFUNC:
+ LoadFunc loadFunc = (LoadFunc) func;
+ break;
+ case FunctionType.STOREFUNC:
+ StoreFunc storeFunc = (StoreFunc) func;
+ break;
+ default:
+ throw new Exception("Received an unknown function type: " + funcType);
+ }
+ }
+}
+
PARSER_END(QueryParser)
// Skip all the new lines, tabs and spaces
@@ -972,12 +1002,7 @@
{
( filename = FileName()
(
- <USING> funcName = QualifiedFunction() "(" funcArgs = StringList() ")"
- {
- funcSpecAsString = funcName + "(" + funcArgs + ")";
- funcSpec = new FuncSpec(funcSpecAsString);
- log.debug("LoadClause: funcSpec = " + funcSpec);
- }
+ <USING> funcSpec = NonEvalFuncSpec(FunctionType.LOADFUNC)
)?
(
<SPLIT> <BY> t3 = <QUOTEDSTRING>
@@ -990,7 +1015,7 @@
)?
)
{
- if (funcSpecAsString == null){
+ if (funcSpec == null){
funcSpecAsString = PigStorage.class.getName();
funcSpec = new FuncSpec(funcSpecAsString);
log.debug("LoadClause: funcSpec = " + funcSpec);
@@ -1272,23 +1297,7 @@
log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
}
)
-| LOOKAHEAD(EvalFunction() "(")
- (evalFunc=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")"
- {
- FuncSpec funcSpec = new FuncSpec(evalFunc.getClass().getName());
- Type javaType = evalFunc.getReturnType();
- byte type = DataType.findType(javaType);
-
- log.debug("Return type of UDF: " + DataType.findTypeName(type));
- cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcSpec, args, type);
- lp.add(cond);
- log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
- for(ExpressionOperator exprOp: args) {
- lp.connect(exprOp, cond);
- log.debug("PUnaryCond: Added operator " + exprOp.getClass().getName() + " " + cond + " to logical plan " + lp);
- }
- }
- )
+| LOOKAHEAD(EvalFuncSpec(over, specs, lp, input)) cond = EvalFuncSpec(over,specs,lp, input, FunctionType.EVALFUNC)
| cond = PNullCond(over,specs,lp,input)
| cond = PNotCond(over,specs,lp,input)
@@ -1433,6 +1442,7 @@
boolean asc = true;
String funcName = null;
Token t1;
+ FuncSpec funcSpec = null;
log.trace("Entering OrderClause");
}
{
@@ -1461,13 +1471,13 @@
}
)
(
- <USING> funcName = QualifiedFunction()
+ <USING> funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
)?
)
{
LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder,
- (funcName != null ? new FuncSpec(funcName) : null));
+ funcSpec );
sort.setStar(star);
sort.setLimit(-1);
lp.add(sort);
@@ -1860,17 +1870,16 @@
String stream, deserializer;
StreamingCommand.HandleSpec[] handleSpecs;
String functionName = "PigStorage", functionArgs="";
+ byte funcType = (handle.compareTo(StreamingCommand.Handle.INPUT) != 0 ? FunctionType.LOADFUNC : FunctionType.STOREFUNC) ;
+ FuncSpec funcSpec = null;
}
{
stream = CommandStream()
[
- <USING> functionName = QualifiedFunction()
- [
- "(" functionArgs = StringList() ")"
- ]
+ <USING> funcSpec = NonEvalFuncSpec(funcType)
]
{
- deserializer = functionName + "(" + functionArgs + ")";
+ deserializer = (funcSpec == null? functionName + "(" + ")" : funcSpec.toString());
command.addHandleSpec(handle,
new HandleSpec(stream, deserializer)
);
@@ -1879,13 +1888,10 @@
","
stream = CommandStream()
[
- <USING> functionName = QualifiedFunction()
- [
- "(" functionArgs = StringList() ")"
- ]
+ <USING> funcSpec = NonEvalFuncSpec(funcType)
]
{
- deserializer = functionName + "(" + functionArgs + ")";
+ deserializer = (funcSpec == null? functionName + "(" + ")" : funcSpec.toString());
command.addHandleSpec(handle,
new HandleSpec(stream, deserializer)
);
@@ -1921,22 +1927,20 @@
}
}
LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; String fileName; String functionSpec = null;
- String functionName, functionArgs;}
+ String functionName, functionArgs; FuncSpec funcSpec = null;}
{
t = <IDENTIFIER> <INTO> fileName = FileName()
(
- <USING> functionName = QualifiedFunction() {functionSpec = functionName;}
- (
- "(" functionArgs = StringList() ")" {functionSpec = functionSpec + "(" + functionArgs + ")";}
- )?
+ <USING> funcSpec = NonEvalFuncSpec(FunctionType.STOREFUNC)
)?
{
- if (functionSpec == null)
- functionSpec = PigStorage.class.getName() + "()";
+ if (funcSpec == null) {
+ funcSpec = new FuncSpec(PigStorage.class.getName() + "()");
+ }
LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),
- new FileSpec(fileName, new FuncSpec(functionSpec)));
+ new FileSpec(fileName, funcSpec));
LogicalOperator input = mapAliasOp.get(t.image);
if (input == null)
@@ -1975,7 +1979,7 @@
(
t = <IDENTIFIER> "="
(
- LOOKAHEAD(FuncEvalSpec(over, specs, lp, input)) item = InfixExpr(over,specs,lp, input)
+ LOOKAHEAD(EvalFuncSpec(over, specs, lp, input, FunctionType.EVALFUNC)) item = InfixExpr(over,specs,lp, input)
{
lp.add(item);
}
@@ -2126,6 +2130,7 @@
LogicalOperator eOp;
Token t;
boolean asc = true;
+ FuncSpec funcSpec = null;
log.trace("Entering NestedSortOrArrange");}
{
(
@@ -2157,13 +2162,13 @@
}
)
(
- <USING> funcName = QualifiedFunction()
+ <USING> funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
)?
)
{
log.debug("Before creating LOSort");
LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder,
- (funcName != null ? new FuncSpec(funcName) : null));
+ funcSpec);
sort.setStar(star);
log.debug("After creating LOSort");
try {
@@ -2479,8 +2484,8 @@
LOOKAHEAD(Const(lp)) item = Const(lp)
| (
(
- LOOKAHEAD(FuncEvalSpec(over,specs,lp,input))
- item = FuncEvalSpec(over,specs,lp,input)
+ LOOKAHEAD(EvalFuncSpec(over,specs,lp,input, FunctionType.EVALFUNC))
+ item = EvalFuncSpec(over,specs,lp,input, FunctionType.EVALFUNC)
| item = ColOrSpec(over,specs,lp,input)
| item = BinCond(over,specs,lp,input)
@@ -2547,7 +2552,7 @@
}
-ExpressionOperator FuncEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+ExpressionOperator EvalFuncSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, byte funcType) :
{
String funcName = null;
FuncSpec funcSpec = null;
@@ -2556,55 +2561,117 @@
List<ExpressionOperator> args;
ExpressionOperator userFunc;
LOUserFunc userAliasFunc = null;
- EvalFunc evalFunc = null;
- log.trace("Entering FuncEvalSpec");
+ Object func = null;
+ log.trace("Entering EvalFuncSpec");
}
{
(
(
LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) funcNameAlias=QualifiedFunction()
{
-
+ func = pigContext.instantiateFuncFromAlias(funcNameAlias);
try{
- evalFunc = (EvalFunc) pigContext.instantiateFuncFromAlias(funcNameAlias);
- Type javaType = evalFunc.getReturnType();
- log.debug("Type: " + javaType);
- log.debug("funcName: " + funcName + " class name: " + evalFunc.getClass().getName() + " return type: " + DataType.findType(javaType));
- }catch (Exception e){
+ FunctionType.tryCasting(func, funcType);
+ } catch (Exception e){
throw new ParseException(e.getMessage());
}
}
)
-| evalFunc=EvalFunction()
+| func=EvalFunction(funcType)
)
"(" args=EvalArgs(over,specs,lp,input) ")"
{
- if(null != evalFunc) {
- funcName = evalFunc.getClass().getName();
+ if(null != func) {
+ funcName = func.getClass().getName();
if(null != funcNameAlias) {
funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
} else {
funcSpec = new FuncSpec(funcName);
}
- Type javaType = evalFunc.getReturnType();
- byte type = DataType.findType(javaType);
- log.debug("Return type of UDF: " + DataType.findTypeName(type));
- log.debug("FuncEvalSpec: funcSpec: " + funcSpec);
+ byte type = DataType.BYTEARRAY;
+ switch(funcType) {
+ case FunctionType.EVALFUNC:
+ Type javaType = ((EvalFunc)func).getReturnType();
+ type = DataType.findType(javaType);
+ log.debug("Return type of UDF: " + DataType.findTypeName(type));
+ log.debug("EvalFuncSpec: funcSpec: " + funcSpec);
+ break;
+ default:
+ throw new ParseException("Received an unknown function type: " + funcType);
+ }
userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcSpec, args, type);
} else {
throw new ParseException("Could not instantiate function: " + funcNameAlias);
}
lp.add(userFunc);
- log.debug("FuncEvalSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
+ log.debug("EvalFuncSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
for(ExpressionOperator exprOp: args) {
lp.connect(exprOp, userFunc);
- log.debug("FuncEvalSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
+ log.debug("EvalFuncSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
}
- log.trace("Exiting FuncEvalSpec");
+ log.trace("Exiting EvalFuncSpec");
return userFunc;
}
}
+FuncSpec NonEvalFuncSpec(byte funcType) :
+{
+ String functionName = null;
+ FuncSpec funcSpec = null;
+ String funcNameAlias = null;
+ String functionArgs = null;
+ Object func = null;
+ log.trace("Entering NonEvalFuncSpec");
+}
+{
+ (
+ (
+ LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) funcNameAlias=QualifiedFunction()
+ {
+ func = pigContext.instantiateFuncFromAlias(funcNameAlias);
+ try{
+ FunctionType.tryCasting(func, funcType);
+ } catch (Exception e){
+ throw new ParseException(e.getMessage());
+ }
+ }
+ )
+| functionName = QualifiedFunction() ( "(" functionArgs = StringList() ")" )?
+ )
+ {
+ if(null != func) {
+ functionName = func.getClass().getName();
+ if(null != funcNameAlias) {
+ funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
+ } else {
+ funcSpec = new FuncSpec(functionName);
+ }
+ } else if (functionName != null) {
+ funcSpec = new FuncSpec(functionName + (functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
+ } else {
+ throw new ParseException("Could not instantiate function: " + funcNameAlias);
+ }
+ switch(funcType) {
+ case FunctionType.COMPARISONFUNC:
+ case FunctionType.LOADFUNC:
+ case FunctionType.STOREFUNC:
+ //funcSpec = new FuncSpec(func.getClass().getName() + (functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
+ System.err.println("funcSpec: " + funcSpec);
+ func = pigContext.instantiateFuncFromSpec(funcSpec);
+ try{
+ FunctionType.tryCasting(func, funcType);
+ } catch (Exception e){
+ throw new ParseException(e.getMessage());
+ }
+ break;
+ default:
+ throw new ParseException("Received an unknown function type: " + funcType);
+ }
+ log.trace("Exiting NonEvalFuncSpec");
+ return funcSpec;
+ }
+}
+
List<ExpressionOperator> EvalArgs(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
{
ArrayList<ExpressionOperator> specList = new ArrayList<ExpressionOperator>();
@@ -2943,26 +3010,24 @@
// These the simple non-terminals that are shared across many
-EvalFunc EvalFunction() :
+Object EvalFunction(byte funcType) :
{
String funcName;
- EvalFunc ef;
+ Object func = null;
log.trace("Entering EvalFunction");
}
{
funcName = QualifiedFunction()
{
+ func = pigContext.instantiateFuncFromAlias(funcName);
try{
- ef = (EvalFunc) pigContext.instantiateFuncFromAlias(funcName);
- Type javaType = ef.getReturnType();
- log.debug("Type: " + javaType);
- log.debug("funcName: " + funcName + " class name: " + ef.getClass().getName() + " return type: " + DataType.findType(javaType));
+ FunctionType.tryCasting(func, funcType);
}catch (Exception e){
throw new ParseException(e.getMessage());
}
log.trace("Exiting EvalFunction");
- return ef;
+ return func;
}
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Mon Dec 8 11:54:25 2008
@@ -32,9 +32,9 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.test.utils.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,8 +44,9 @@
private MiniCluster cluster = MiniCluster.buildCluster();
private File tmpFile;
+ TupleFactory tf = TupleFactory.getInstance();
+
public TestFilterUDF() throws ExecException, IOException{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
int LOOP_SIZE = 20;
tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -57,11 +58,29 @@
@Before
public void setUp() throws Exception {
-
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ int LOOP_SIZE = 20;
+ tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(i);
+ }
+ ps.close();
}
@After
public void tearDown() throws Exception {
+ tmpFile.delete();
+ }
+
+ private File createFile(String[] data) throws Exception{
+ File f = File.createTempFile("tmp", "");
+ PrintWriter pw = new PrintWriter(f);
+ for (int i=0; i<data.length; i++){
+ pw.println(data[i]);
+ }
+ pw.close();
+ return f;
}
static public class MyFilterFunction extends EvalFunc<Boolean>{
@@ -96,4 +115,38 @@
}
assertEquals(10, cnt);
}
+
+ @Test
+ public void testFilterUDFusingDefine() throws Exception{
+ File inputFile= createFile(
+ new String[]{
+ "www.paulisageek.com\t4",
+ "www.yahoo.com\t12344",
+ "google.com\t1",
+ "us2.amazon.com\t4141"
+ }
+ );
+
+ File filterFile = createFile(
+ new String[]{
+ "12344"
+ }
+ );
+
+ pigServer.registerQuery("define FILTER_CRITERION " + FILTERFROMFILE.class.getName() + "('" + FileLocalizer.hadoopify(Util.generateURI(filterFile.toString()), pigServer.getPigContext()) + "');");
+ pigServer.registerQuery("a = LOAD '" + Util.generateURI(inputFile.toString()) + "' as (url:chararray, numvisits:int);");
+ pigServer.registerQuery("b = filter a by FILTER_CRITERION(numvisits);");
+
+ Tuple expectedTuple = tf.newTuple();
+ expectedTuple.append(new String("www.yahoo.com"));
+ expectedTuple.append(new Integer("12344"));
+
+ Iterator<Tuple> iter = pigServer.openIterator("b");
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertTrue(t.equals(expectedTuple));
+ }
+
+ }
+
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Mon Dec 8 11:54:25 2008
@@ -1839,7 +1839,44 @@
"CONCAT(null, str);";
buildPlan(query);
}
+
+ @Test
+ public void testFilterUdfDefine() {
+ String query = "define isempty IsEmpty(); a = load 'a' as (x:int, y:double, str:chararray);" +
+ "b = filter a by isempty(*);";
+ buildPlan(query);
+ }
+
+ @Test
+ public void testLoadUdfDefine() {
+ String query = "define PS PigStorage(); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+ "b = filter a by IsEmpty(*);";
+ buildPlan(query);
+ }
+ @Test
+ public void testLoadUdfConstructorArgDefine() {
+ String query = "define PS PigStorage(':'); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+ "b = filter a by IsEmpty(*);";
+ buildPlan(query);
+ }
+
+ @Test
+ public void testStoreUdfDefine() {
+ String query = "define PS PigStorage(); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+ "b = filter a by IsEmpty(*);" +
+ "store b into 'x' using PS;" ;
+ buildPlan(query);
+ }
+
+ @Test
+ public void testStoreUdfConstructorArgDefine() {
+ String query = "define PS PigStorage(':'); a = load 'a' using PS as (x:int, y:double, str:chararray);" +
+ "b = filter a by IsEmpty(*);" +
+ "store b into 'x' using PS;" ;
+ buildPlan(query);
+ }
+
private void printPlan(LogicalPlan lp) {
LOPrinter graphPrinter = new LOPrinter(System.err, lp);
System.err.println("Printing the logical plan");
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Mon Dec 8 11:54:25 2008
@@ -302,6 +302,77 @@
}
@Test
+ public void testInputShipSpecsWithUDFDefine() throws Exception {
+ // FIXME : this should be tested in all modes
+ if(execType == ExecType.LOCAL)
+ return;
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print STDOUT \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ File command1 = Util.createInputFile("script", "pl", script);
+ File command2 = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+ // Pig query to run
+
+ pigServer.registerQuery(
+ "define PS " + PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery(
+ "define CMD1 `" + command1.getName() + " foo` " +
+ "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
+ "input('foo' using PS ) " +
+ "output(stdout using PS ) " +
+ "stderr();");
+ pigServer.registerQuery(
+ "define CMD2 `" + command2.getName() + " bar` " +
+ "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
+ "input('bar' using PS ) " +
+ "output(stdout using PS ) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS ;");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+ "through CMD1;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
+ @Test
public void testInputCacheSpecs() throws Exception {
// Can't run this without HDFS
if(execType == ExecType.LOCAL)
@@ -437,6 +508,68 @@
}
@Test
+ public void testOutputShipSpecsWithUDFDefine() throws Exception {
+ // FIXME : this should be tested in all modes
+ if(execType == ExecType.LOCAL)
+ return;
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+ "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+ "while (<STDIN>) {",
+ " print OUTFILE \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ " print OUTFILE2 \"A,10\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "A", "A", "A", "A", "A"};
+ Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define PS " + PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery(
+ "define CMD `" + command.getName() + " foo bar` " +
+ "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+ "output('foo' using PS, " +
+ "'bar' using PS) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS;");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output+"/bar",
+ pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+ @Test
public void testInputOutputSpecs() throws Exception {
// FIXME : this should be tested in all modes
if(execType == ExecType.LOCAL)
Added: hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=724462&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java Mon Dec 8 11:54:25 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.lang.Boolean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+
+/**
+ *
+ * define MyFilterSet util.FILTERFROMFILE('/user/pig/filterfile');
+ *
+ * A = load 'mydata' using PigStorage() as ( a, b );
+ * B = filter A by MyFilterSet(a);
+ *
+ */
+public class FILTERFROMFILE extends FilterFunc{
+ private String FilterFileName = "";
+
+ public FILTERFROMFILE(){
+ }
+
+ public FILTERFROMFILE(String FilterFileName){
+ this.FilterFileName = FilterFileName;
+ }
+
+ Map<String, Boolean> lookupTable = null;
+
+
+ private void init() throws IOException {
+
+ lookupTable = new HashMap<String, Boolean>();
+
+ Properties props = ConfigurationUtil.toProperties(PigInputFormat.sJob);
+ InputStream is = FileLocalizer.openDFSFile(FilterFileName, props);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+
+ while (true){
+ String line = reader.readLine();
+
+ if (line == null)
+ break;
+
+ String FilterField = line.split("\t")[0];
+
+ lookupTable.put(FilterField, Boolean.TRUE);
+ }
+ }
+
+ @Override
+ public Boolean exec(Tuple input) throws IOException {
+ if (lookupTable == null){
+ init();
+ }
+ String s;
+ try {
+ s = input.get(0).toString();
+ } catch (ExecException e) {
+ IOException ioe = new IOException("Error getting data");
+ ioe.initCause(e);
+ throw ioe;
+ }
+
+ boolean matched = lookupTable.containsKey(s);
+
+ return(matched);
+ }
+}