You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/09/06 01:36:34 UTC

svn commit: r692581 - /incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java

Author: gates
Date: Fri Sep  5 16:36:33 2008
New Revision: 692581

URL: http://svn.apache.org/viewvc?rev=692581&view=rev
Log:
PIG-401 ImplicitSplitInserter class that should have been in the last checkin.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=692581&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Fri Sep  5 16:36:33 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+public class ImplicitSplitInserter extends LogicalTransformer {
+
+    public ImplicitSplitInserter(LogicalPlan plan) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    @Override
+    public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+        // Look to see if this is a non-split node with two outputs.  If so
+        // it matches.
+        LogicalOperator op = nodes.get(0);
+        List<LogicalOperator> succs = mPlan.getSuccessors(op);
+        if (succs == null || succs.size() < 2) return false;
+        if (op instanceof LOSplit) return false;
+        return true;
+    }
+
+    @Override
+    public void transform(List<LogicalOperator> nodes)
+            throws OptimizerException {
+        // Insert a split and its corresponding SplitOutput nodes into the plan
+        // between node 0 and 1 / 2.
+        String scope = nodes.get(0).getOperatorKey().scope;
+        NodeIdGenerator idGen = NodeIdGenerator.getGenerator();
+        LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, 
+                idGen.getNextNodeId(scope)), new ArrayList<LogicalOperator>());
+        try {
+            mPlan.add(splitOp);
+            
+            
+            // Find all the successors and disconnect them.  Keep our own copy
+            // of the list, as we're changing the graph by doing these calls 
+            // and that will change the list of predecessors.
+            List<LogicalOperator> succs = 
+                new ArrayList<LogicalOperator>(mPlan.getSuccessors(nodes.get(0)));
+            int index = -1;
+            for (LogicalOperator succ : succs) {
+                mPlan.disconnect(nodes.get(0), succ);
+                LogicalPlan condPlan = new LogicalPlan();
+                LOConst cnst = new LOConst(mPlan, new OperatorKey(scope, 
+                        idGen.getNextNodeId(scope)), new Boolean(true));
+                cnst.setType(DataType.BOOLEAN);
+                condPlan.add(cnst);
+                LOSplitOutput splitOutput = new LOSplitOutput(mPlan, 
+                        new OperatorKey(scope, idGen.getNextNodeId(scope)), ++index, condPlan);
+                splitOp.addOutput(splitOutput);
+                mPlan.add(splitOutput);
+                mPlan.connect(splitOp, splitOutput);
+                mPlan.connect(splitOutput, succ);
+                // Patch up the contained plans of succ
+                fixUpContainedPlans(nodes.get(0), splitOutput, succ, null);
+            }
+            mPlan.connect(nodes.get(0), splitOp);
+        } catch (Exception e) {
+            throw new OptimizerException(e);
+        }
+    }
+}