You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/15 00:20:13 UTC
svn commit: r686064 - in /incubator/pig/branches/types:
src/org/apache/pig/PigServer.java
src/org/apache/pig/impl/plan/SplitIntroducer.java
test/org/apache/pig/test/TestImplicitSplit.java
Author: olga
Date: Thu Aug 14 15:20:11 2008
New Revision: 686064
URL: http://svn.apache.org/viewvc?rev=686064&view=rev
Log:
PIG-375: addition of implicit split
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=686064&r1=686063&r2=686064&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Aug 14 15:20:11 2008
@@ -47,6 +47,7 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.ExpressionOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -59,7 +60,9 @@
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.SplitIntroducer;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.util.PropertiesUtil;
@@ -535,7 +538,9 @@
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lp);
ps.visit();
-
+
+ (new SplitIntroducer(lp)).introduceImplSplits();
+
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
FrontendException caught = null;
Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java?rev=686064&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java Thu Aug 14 15:20:11 2008
@@ -0,0 +1,107 @@
+package org.apache.pig.impl.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+public class SplitIntroducer extends PlanWalker<LogicalOperator, LogicalPlan> {
+ private NodeIdGenerator nodeIdGen;
+
+ public SplitIntroducer(LogicalPlan plan) {
+ super(plan);
+ nodeIdGen = NodeIdGenerator.getGenerator();
+ }
+
+ private long getNextId(String scope) {
+ return nodeIdGen.getNextNodeId(scope);
+ }
+
+ @Override
+ public PlanWalker<LogicalOperator, LogicalPlan> spawnChildWalker(LogicalPlan plan) {
+ return new SplitIntroducer(plan);
+ }
+
+ public void introduceImplSplits() throws VisitorException {
+ List<LogicalOperator> roots = copySucs(mPlan.getRoots());
+ if(roots == null) return;
+ for (LogicalOperator root : roots) {
+ processNode(root);
+ }
+ }
+
+ @Override
+ /**
+ * This method is to conform to the interface.
+ */
+ public void walk(PlanVisitor visitor) throws VisitorException {
+ throw new VisitorException(
+ "This method is not to be used. This Walker does not call any visit() methods. It only alters the plan by introducing implicit splits if necessary.");
+ }
+
+ private void processNode(LogicalOperator root) throws VisitorException {
+ if(root instanceof LOSplit || root instanceof LOSplitOutput) return;
+ List<LogicalOperator> sucs = mPlan.getSuccessors(root);
+ if(sucs==null) return;
+ int size = sucs.size();
+ if(size==0 || size==1) return;
+ sucs = copySucs(mPlan.getSuccessors(root));
+ disconnect(root,sucs);
+ String scope = root.getOperatorKey().scope;
+ LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, getNextId(scope)), new ArrayList<LogicalOperator>());
+ mPlan.add(splitOp);
+ try {
+ mPlan.connect(root, splitOp);
+ int index = -1;
+ for (LogicalOperator operator : sucs) {
+ LogicalPlan condPlan = new LogicalPlan();
+ LOConst cnst = new LOConst(mPlan,new OperatorKey(scope, getNextId(scope)), true);
+ cnst.setType(DataType.BOOLEAN);
+ condPlan.add(cnst);
+ LOSplitOutput splitOutput = new LOSplitOutput(mPlan, new OperatorKey(scope, getNextId(scope)), ++index, condPlan);
+ splitOp.addOutput(splitOutput);
+ mPlan.add(splitOutput);
+ mPlan.connect(splitOp, splitOutput);
+ mPlan.connect(splitOutput, operator);
+ }
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ private List<LogicalOperator> copySucs(List<LogicalOperator> successors){
+ ArrayList<LogicalOperator> ret = new ArrayList<LogicalOperator>();
+ for (LogicalOperator operator : successors) {
+ ret.add(operator);
+ }
+ return ret;
+ }
+
+ private void disconnect(LogicalOperator from, List<LogicalOperator> successors) {
+ for (LogicalOperator operator : successors) {
+ mPlan.disconnect(from, operator);
+ }
+ }
+
+ /* public static void main(String[] args) throws ExecException, IOException, FrontendException {
+ PigServer ser = new PigServer(ExecType.LOCAL);
+ ser.registerQuery("A = load 'file:/etc/passwd' using PigStorage(':');");
+ ser.registerQuery("B = foreach A generate $0;");
+ ser.registerQuery("C = foreach A generate $1;");
+ ser.registerQuery("D = group B by $0, C by $0;");
+ LogicalPlan lp = ser.getPlanFromAlias("D", "Testing");
+ lp.explain(System.out, System.err);
+ LogicalPlan fixedLp = ser.compileLp(lp, "Testing");
+ System.out.println();
+ fixedLp.explain(System.out, System.err);
+ }*/
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java?rev=686064&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java Thu Aug 14 15:20:11 2008
@@ -0,0 +1,52 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestImplicitSplit extends TestCase{
+ private PigServer pigServer;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testImplicitSplit() throws Exception{
+ int LOOP_SIZE = 20;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(i);
+ }
+ ps.close();
+ pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pigServer.registerQuery("B = filter A by $0<=10;");
+ pigServer.registerQuery("C = filter A by $0>10;");
+ pigServer.registerQuery("D = union B,C;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ }
+}