You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/15 00:20:13 UTC

svn commit: r686064 - in /incubator/pig/branches/types: src/org/apache/pig/PigServer.java src/org/apache/pig/impl/plan/SplitIntroducer.java test/org/apache/pig/test/TestImplicitSplit.java

Author: olga
Date: Thu Aug 14 15:20:11 2008
New Revision: 686064

URL: http://svn.apache.org/viewvc?rev=686064&view=rev
Log:
PIG-375: addition of implicit split

Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=686064&r1=686063&r2=686064&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Aug 14 15:20:11 2008
@@ -47,6 +47,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -59,7 +60,9 @@
 import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.SplitIntroducer;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.util.PropertiesUtil;
@@ -535,7 +538,9 @@
         // Set the logical plan values correctly in all the operators
         PlanSetter ps = new PlanSetter(lp);
         ps.visit();
-
+        
+        (new SplitIntroducer(lp)).introduceImplSplits();
+        
         // run through validator
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
         FrontendException caught = null;

Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java?rev=686064&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java Thu Aug 14 15:20:11 2008
@@ -0,0 +1,107 @@
+package org.apache.pig.impl.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+public class SplitIntroducer extends PlanWalker<LogicalOperator, LogicalPlan> {
+    private NodeIdGenerator nodeIdGen;
+    
+    public SplitIntroducer(LogicalPlan plan) {
+        super(plan);
+        nodeIdGen = NodeIdGenerator.getGenerator();
+    }
+    
+    private long getNextId(String scope) {
+        return nodeIdGen.getNextNodeId(scope);
+    }
+
+    @Override
+    public PlanWalker<LogicalOperator, LogicalPlan> spawnChildWalker(LogicalPlan plan) {
+        return new SplitIntroducer(plan);
+    }
+    
+    public void introduceImplSplits() throws VisitorException {
+        List<LogicalOperator> roots = copySucs(mPlan.getRoots());
+        if(roots == null) return;
+        for (LogicalOperator root : roots) {
+            processNode(root);
+        }
+    }
+
+    @Override
+    /**
+     * This method is to conform to the interface.
+     */
+    public void walk(PlanVisitor visitor) throws VisitorException {
+        throw new VisitorException(
+                "This method is not to be used. This Walker does not call any visit() methods. It only alters the plan by introducing implicit splits if necessary.");
+    }
+    
+    private void processNode(LogicalOperator root) throws VisitorException {
+        if(root instanceof LOSplit || root instanceof LOSplitOutput) return;
+        List<LogicalOperator> sucs = mPlan.getSuccessors(root);
+        if(sucs==null) return;
+        int size = sucs.size();
+        if(size==0 || size==1) return;
+        sucs = copySucs(mPlan.getSuccessors(root));
+        disconnect(root,sucs);
+        String scope = root.getOperatorKey().scope;
+        LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, getNextId(scope)), new ArrayList<LogicalOperator>());
+        mPlan.add(splitOp);
+        try {
+            mPlan.connect(root, splitOp);
+            int index = -1;
+            for (LogicalOperator operator : sucs) {
+                LogicalPlan condPlan = new LogicalPlan();
+                LOConst cnst = new LOConst(mPlan,new OperatorKey(scope, getNextId(scope)), true);
+                cnst.setType(DataType.BOOLEAN);
+                condPlan.add(cnst);
+                LOSplitOutput splitOutput = new LOSplitOutput(mPlan, new OperatorKey(scope, getNextId(scope)), ++index, condPlan);
+                splitOp.addOutput(splitOutput);
+                mPlan.add(splitOutput);
+                mPlan.connect(splitOp, splitOutput);
+                mPlan.connect(splitOutput, operator);
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+    private List<LogicalOperator> copySucs(List<LogicalOperator> successors){
+        ArrayList<LogicalOperator> ret = new ArrayList<LogicalOperator>();
+        for (LogicalOperator operator : successors) {
+            ret.add(operator);
+        }
+        return ret;
+    }
+    
+    private void disconnect(LogicalOperator from, List<LogicalOperator> successors) {
+        for (LogicalOperator operator : successors) {
+            mPlan.disconnect(from, operator);
+        }
+    }
+    
+   /* public static void main(String[] args) throws ExecException, IOException, FrontendException {
+        PigServer ser = new PigServer(ExecType.LOCAL);
+        ser.registerQuery("A = load 'file:/etc/passwd' using PigStorage(':');");
+        ser.registerQuery("B = foreach A generate $0;");
+        ser.registerQuery("C = foreach A generate $1;");
+        ser.registerQuery("D = group B by $0, C by $0;");
+        LogicalPlan lp = ser.getPlanFromAlias("D", "Testing");
+        lp.explain(System.out, System.err);
+        LogicalPlan fixedLp = ser.compileLp(lp, "Testing");
+        System.out.println();
+        fixedLp.explain(System.out, System.err);
+    }*/
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java?rev=686064&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java Thu Aug 14 15:20:11 2008
@@ -0,0 +1,52 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestImplicitSplit extends TestCase{
+    private PigServer pigServer;
+    
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    @Test
+    public void testImplicitSplit() throws Exception{
+        int LOOP_SIZE = 20;
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pigServer.registerQuery("B = filter A by $0<=10;");
+        pigServer.registerQuery("C = filter A by $0>10;");
+        pigServer.registerQuery("D = union B,C;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+    }
+}