You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC
svn commit: r923043 [2/5] - in /hadoop/pig/trunk:
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/logical/
src/org/apache/pig/experimental/logical/expression/
src/org/apache/pig/experimental/logical/optimizer/ src/org...
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.util.MultiMap;
+
+public class LOCogroup extends LogicalRelationalOperator {
+
+ // List of booleans specifying if any of the cogroups is inner
+ private boolean[] mIsInner;
+
+ // List of expressionPlans according to input
+ private MultiMap<Integer,LogicalExpressionPlan> mExpressionPlans;
+
+ /**
+ * Enum for the type of group
+ */
+ public static enum GROUPTYPE {
+ REGULAR, // Regular (co)group
+ COLLECTED // Collected group
+ };
+
+ private GROUPTYPE mGroupType;
+
+ /*
+ * This member would store the schema of group key,
+ * we store it to retain uid numbers between
+ * resetSchema and getSchema
+ */
+ private LogicalFieldSchema groupKeySchema = null;
+ /*
+ * This is a map storing Uids which have been generated for an input
+ * This map is required to make the uids persistant between calls of
+ * resetSchema and getSchema
+ */
+ private Map<Integer,Long> generatedInputUids = null;
+
+ final static String GROUP_COL_NAME = "group";
+
+ public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan>
+ expressionPlans, boolean[] isInner ) {
+ this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner, -1 );
+ }
+
+ public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan>
+ expressionPlans, GROUPTYPE groupType, boolean[] isInner, int requestedParrellism) {
+ super("LOCogroup", plan);
+ this.mExpressionPlans = expressionPlans;
+ if( isInner != null ) {
+ mIsInner = Arrays.copyOf(isInner, isInner.length);
+ }
+ this.mGroupType = groupType;
+ this.generatedInputUids = new HashMap<Integer,Long>();
+ }
+
+ /**
+ * Given an expression plan this function returns a LogicalFieldSchema
+ * that can be generated using this expression plan
+ * @param exprPlan ExpressionPlan which generates this field
+ * @return
+ */
+ private LogicalFieldSchema getPlanSchema( LogicalExpressionPlan exprPlan ) {
+ LogicalExpression sourceExp = (LogicalExpression) exprPlan.getSources().get(0);
+ byte sourceType = sourceExp.getType();
+ // We dont support bags for Cogroup
+ if( sourceType == DataType.BAG ) {
+ return null;
+ }
+ LogicalSchema fieldSchema = null;
+ String alias = null;
+
+ // If we have a projection then caculate the schema of the projection
+ if (sourceExp instanceof ProjectExpression) {
+ LogicalRelationalOperator op = null;
+ try{
+ op = ((ProjectExpression)sourceExp).findReferent(this);
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ LogicalSchema s = op.getSchema();
+ if (s != null) {
+ fieldSchema = s.getField(((ProjectExpression)sourceExp).getColNum()).schema;
+ alias = s.getField(((ProjectExpression)sourceExp).getColNum()).alias;
+ }
+ }
+
+ return new LogicalFieldSchema(alias, fieldSchema, sourceType, sourceExp.getUid());
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ // if schema is calculated before, just return
+ if (schema != null) {
+ return schema;
+ }
+
+ List<Operator> inputs = null;
+ try {
+ inputs = plan.getPredecessors(this);
+ if (inputs == null) {
+ return null;
+ }
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessors of " + name
+ + " operator. ", e);
+ }
+
+ List<LogicalFieldSchema> fieldSchemaList = new ArrayList<LogicalFieldSchema>();
+
+ // We only calculate this if we havent. This would not be null in a case
+ // where we calculate the schema and then reset it.
+ if( groupKeySchema == null ) {
+ // See if we have more than one expression plans, if so the
+ // schema of the group column will be a tuple
+ boolean hasMultipleKeys = false;
+ for( Integer key : mExpressionPlans.keySet() ) {
+ if( mExpressionPlans.get(key).size() > 1 ) {
+ hasMultipleKeys = true;
+ break;
+ }
+ }
+
+ // Generate the groupField Schema
+ if( hasMultipleKeys ) {
+ LogicalSchema keySchema = new LogicalSchema();
+ // We sort here to maintain the correct order of inputs
+ TreeSet<Integer> keySet = new TreeSet<Integer>();
+ keySet.addAll( mExpressionPlans.keySet() );
+ for( Integer key : keySet ) {
+ Collection<LogicalExpressionPlan> plans =
+ mExpressionPlans.get(key);
+
+ for( LogicalExpressionPlan plan : plans ) {
+ LogicalFieldSchema fieldSchema = getPlanSchema(plan);
+ // if any plan schema is null, that means we can't calculate
+ // further schemas so we bail out
+ if( fieldSchema == null ) {
+ schema = null;
+ return schema;
+ }
+ // Change the uid of this field
+ fieldSchema.uid = LogicalExpression.getNextUid();
+ keySchema.addField(fieldSchema);
+ }
+ // We only need fields from one input and not all
+ break;
+ }
+ groupKeySchema = new LogicalFieldSchema(GROUP_COL_NAME, keySchema, DataType.TUPLE,
+ LogicalExpression.getNextUid() );
+ } else {
+ // We sort here to maintain the correct order of inputs
+ TreeSet<Integer> keySet = new TreeSet<Integer>();
+ keySet.addAll( mExpressionPlans.keySet() );
+ for( Integer key : keySet ) {
+ Collection<LogicalExpressionPlan> plans = mExpressionPlans.get(key);
+ for( LogicalExpressionPlan plan : plans ) {
+ groupKeySchema = getPlanSchema(plan);
+ // if any plan schema is null, that means we can't calculate
+ // further schemas so we bail out
+ if( groupKeySchema == null ) {
+ schema = null;
+ return schema;
+ }
+ // Change the uid of this field
+ groupKeySchema.alias = GROUP_COL_NAME;
+ groupKeySchema.uid = LogicalExpression.getNextUid();
+ break;
+ }
+ break;
+ }
+ }
+ }
+
+ fieldSchemaList.add( groupKeySchema );
+
+ // Generate the Bag Schema
+ int counter = 0;
+ for (Operator op : inputs) {
+ LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+ // the schema of one input is unknown, so the join schema is unknown, just return
+ if (inputSchema == null) {
+ schema = null;
+ return schema;
+ }
+
+ // Check if we already have calculated Uid for this bag for given
+ // input operator
+ long bagUid = -1;
+ if( generatedInputUids.containsKey(counter) ) {
+ bagUid = generatedInputUids.get(counter);
+ } else {
+ bagUid = LogicalExpression.getNextUid();
+ generatedInputUids.put( counter, bagUid );
+ }
+
+ LogicalFieldSchema newBagSchema = new LogicalFieldSchema(
+ ((LogicalRelationalOperator)op).getAlias(), inputSchema,
+ DataType.BAG, bagUid);
+
+ fieldSchemaList.add( newBagSchema );
+ counter ++;
+ }
+
+ schema = new LogicalSchema();
+ for(LogicalFieldSchema fieldSchema: fieldSchemaList) {
+ schema.addField(fieldSchema);
+ }
+
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalPlanVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalPlanVisitor)v).visitLOCogroup(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOCogroup) {
+ LOCogroup oc = (LOCogroup)other;
+ if( mGroupType == oc.mGroupType &&
+ mIsInner.length == oc.mIsInner.length
+ && mExpressionPlans.size() == oc.mExpressionPlans.size() ) {
+ for( int i = 0; i < mIsInner.length; i++ ) {
+ if( mIsInner[i] != oc.mIsInner[i] ) {
+ return false;
+ }
+ }
+ for( Integer key : mExpressionPlans.keySet() ) {
+ if( ! oc.mExpressionPlans.containsKey(key) ) {
+ return false;
+ }
+ Collection<LogicalExpressionPlan> exp1 =
+ mExpressionPlans.get(key);
+ Collection<LogicalExpressionPlan> exp2 =
+ oc.mExpressionPlans.get(key);
+
+ if(! ( exp1 instanceof ArrayList<?>
+ || exp2 instanceof ArrayList<?> ) ) {
+ throw new RuntimeException( "Expected an ArrayList " +
+ "of Expression Plans" );
+ }
+
+ ArrayList<LogicalExpressionPlan> expList1 =
+ (ArrayList<LogicalExpressionPlan>) exp1;
+ ArrayList<LogicalExpressionPlan> expList2 =
+ (ArrayList<LogicalExpressionPlan>) exp2;
+
+ for (int i = 0; i < expList1.size(); i++) {
+ if (!expList1.get(i).isEqual(expList2.get(i))) {
+ return false;
+ }
+ }
+ }
+ return checkEquality((LogicalRelationalOperator) other);
+ }
+ }
+ return false;
+ }
+
+ public GROUPTYPE getGroupType() {
+ return mGroupType;
+ }
+
+ /**
+ * Returns an Unmodifiable Map of Input Number to Uid
+ * @return Unmodifiable Map<Integer,Long>
+ */
+ public Map<Integer,Long> getGeneratedInputUids() {
+ return Collections.unmodifiableMap( generatedInputUids );
+ }
+
+ public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
+ return mExpressionPlans;
+ }
+
+ public boolean[] getInner() {
+ return mIsInner;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java Mon Mar 15 03:28:27 2010
@@ -63,6 +63,13 @@ public class LOGenerate extends LogicalR
}
LogicalSchema s = op.getSchema();
if (s != null) {
+ if (((ProjectExpression)exp).isProjectStar()) {
+ for(LogicalFieldSchema f: s.getFields()) {
+ schema.addField(f);
+ }
+ continue;
+ }
+
fieldSchema = s.getField(((ProjectExpression)exp).getColNum()).schema;
alias = s.getField(((ProjectExpression)exp).getColNum()).alias;
}
@@ -114,6 +121,10 @@ public class LOGenerate extends LogicalR
return flattenFlags;
}
+ public void setFlattenFlags(boolean[] flatten) {
+ flattenFlags = flatten;
+ }
+
@Override
public boolean isEqual(Operator other) {
if (!(other instanceof LOGenerate)) {
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java Mon Mar 15 03:28:27 2010
@@ -57,27 +57,27 @@ public class LOInnerLoad extends Logical
LogicalRelationalOperator op = (LogicalRelationalOperator)p.getPredecessors(foreach).get(0);
LogicalSchema s = op.getSchema();
if (s != null) {
- schema = new LogicalSchema();
- long uid = prj.getUid();
- for(int i=0; i<s.size(); i++) {
- if (uid == s.getField(i).uid) {
- schema.addField(s.getField(i));
- }
+ if (prj.isProjectStar()) {
+ schema = s;
+ } else {
+ schema = new LogicalSchema();
+ schema.addField(s.getField(getColNum()));
}
- }
+ }
if ( schema != null && schema.size() == 0) {
schema = null;
}
}catch(Exception e) {
+ e.printStackTrace();
throw new RuntimeException(e);
}
return schema;
}
- public LogicalExpressionPlan getExpression() {
- return (LogicalExpressionPlan)prj.getPlan();
+ public ProjectExpression getProjection() {
+ return prj;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Mon Mar 15 03:28:27 2010
@@ -31,7 +31,7 @@ public class LOLoad extends LogicalRelat
private LogicalSchema scriptSchema;
private FileSpec fs;
- private transient LoadPushDown loadFunc;
+ private transient LoadFunc loadFunc;
/**
*
@@ -46,13 +46,11 @@ public class LOLoad extends LogicalRelat
fs = loader;
}
- public LoadPushDown getLoadPushDown() {
+ public LoadFunc getLoadFunc() {
try {
if (loadFunc == null) {
- Object obj = PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
- if (obj instanceof LoadPushDown) {
- loadFunc = (LoadPushDown)obj;
- }
+ loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+ loadFunc.setUDFContextSignature(getAlias());
}
return loadFunc;
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOSplit extends LogicalRelationalOperator {
+
+ public LOSplit(OperatorPlan plan) {
+ super("LOSplit", plan);
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+ }
+
+ schema = input.getSchema();
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalPlanVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalPlanVisitor)v).visitLOSplit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOSplit) {
+ return checkEquality((LOSplit)other);
+ } else {
+ return false;
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOSplitOutput extends LogicalRelationalOperator {
+ private LogicalExpressionPlan filterPlan;
+ public LOSplitOutput(LogicalPlan plan) {
+ super("LOSplitOutput", plan);
+ }
+
+ public LOSplitOutput(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+ super("LOSplitOutput", plan);
+ this.filterPlan = filterPlan;
+ }
+
+ public LogicalExpressionPlan getFilterPlan() {
+ return filterPlan;
+ }
+
+ public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+ this.filterPlan = filterPlan;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+ }
+
+ schema = input.getSchema();
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalPlanVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalPlanVisitor)v).visitLOSplitOutput(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOSplitOutput) {
+ LOSplitOutput os = (LOSplitOutput)other;
+ return filterPlan.isEqual(os.filterPlan) && checkEquality(os);
+ } else {
+ return false;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOUnion extends LogicalRelationalOperator {
+
+ public LOUnion(OperatorPlan plan) {
+ super("LOUnion", plan);
+ }
+ @Override
+ public LogicalSchema getSchema() {
+ List<Operator> inputs = null;
+ try {
+ inputs = plan.getPredecessors(this);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOUnion.", e);
+ }
+
+ // If any predecessor's schema is null, or length of predecessor's schema does not match,
+ // then the schema for union is null
+ int length = -1;
+ for (Operator input : inputs) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)input;
+ if (op.getSchema()==null)
+ return null;
+ if (length==-1)
+ op.getSchema().size();
+ else {
+ if (op.getSchema().size()!=length)
+ return null;
+ }
+ }
+
+ // Check if all predecessor's schema are compatible.
+ // TODO: Migrate all existing schema merging rules
+ LogicalSchema mergedSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+ for (int i=1;i<inputs.size();i++) {
+ LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+ if (!mergedSchema.isEqual(otherSchema))
+ return null;
+ }
+ return mergedSchema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalPlanVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalPlanVisitor)v).visitLOUnion(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOUnion) {
+ return checkEquality((LOUnion)other);
+ } else {
+ return false;
+ }
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Mon Mar 15 03:28:27 2010
@@ -23,12 +23,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
+
+import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -38,10 +41,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.experimental.logical.expression.BagDereferenceExpression;
import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
import org.apache.pig.experimental.logical.expression.ProjectExpression;
@@ -53,7 +60,10 @@ import org.apache.pig.experimental.plan.
import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
import org.apache.pig.experimental.plan.SubtreeDependencyOrderWalker;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -185,6 +195,7 @@ public class LogToPhyTranslationVisitor
exprOp.setResultType(s.getField(0).type);
}
exprOp.setColumn(load.getColNum());
+ exprOp.setStar(load.getProjection().isProjectStar());
// set input to POProject to the predecessor of foreach
@@ -221,8 +232,8 @@ public class LogToPhyTranslationVisitor
List<Operator> leaves = exps.get(i).getSinks();
for(Operator l: leaves) {
PhysicalOperator op = logToPhyMap.get(l);
- if (l instanceof ProjectExpression) {
- int input = ((ProjectExpression)l).getInputNum();
+ if (l instanceof ProjectExpression ) {
+ int input = ((ProjectExpression)l).getInputNum();
// for each sink projection, get its input logical plan and translate it
Operator pred = preds.get(input);
@@ -240,16 +251,15 @@ public class LogToPhyTranslationVisitor
// comes from expression plan
currentPlan.remove(leaf);
logToPhyMap.remove(pred);
+
((POProject)op).setColumn( ((POProject)leaf).getColumn() );
-
+ ((POProject)op).setStar(((POProject)leaf).isStar());
+
}else{
currentPlan.connect(leaf, op);
}
}
}
-
-
-
innerPlans.add(currentPlan);
}
@@ -416,6 +426,127 @@ public class LogToPhyTranslationVisitor
}
@Override
+ public void visitLOCogroup( LOCogroup cg ) throws IOException {
+ if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
+ translateCollectedCogroup(cg);
+ } else {
+ translateRegularCogroup(cg);
+ }
+ }
+
+ private void translateRegularCogroup(LOCogroup cg) throws IOException {
+ List<Operator> preds = plan.getPredecessors(cg);
+
+ POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+ poGlobal.setAlias(cg.getAlias());
+ POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
+ .getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam());
+ poPackage.setAlias(cg.getAlias());
+ currentPlan.add(poGlobal);
+ currentPlan.add(poPackage);
+
+ try {
+ currentPlan.connect(poGlobal, poPackage);
+ } catch (PlanException e1) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+ }
+
+ Byte type = null;
+ for( int i = 0 ; i < preds.size(); i++ ) {
+ ArrayList<LogicalExpressionPlan> exprPlans =
+ (ArrayList<LogicalExpressionPlan>) cg.getExpressionPlans().get(i);
+
+ POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+ physOp.setAlias(cg.getAlias());
+
+ List<PhysicalPlan> pExprPlans = translateExpressionPlans( cg, exprPlans );
+
+ try {
+ physOp.setPlans(pExprPlans);
+ } catch (PlanException pe) {
+ int errCode = 2071;
+ String msg = "Problem with setting up local rearrange's plans.";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+ }
+ try {
+ physOp.setIndex(i);
+ } catch (ExecException e1) {
+ // int errCode = 2058;
+ String msg = "Unable to set index on newly create POLocalRearrange.";
+ throw new IOException(msg);
+ }
+ if (exprPlans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(logToPhyMap.get(preds.get(i)), physOp);
+ currentPlan.connect(physOp, poGlobal);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ poPackage.setKeyType(type);
+ poPackage.setResultType(DataType.TUPLE);
+ poPackage.setNumInps(preds.size());
+ poPackage.setInner(cg.getInner());
+ logToPhyMap.put(cg, poPackage);
+ }
+
+ private void translateCollectedCogroup(LOCogroup cg) throws IOException {
+ // can have only one input
+ LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
+ List<LogicalExpressionPlan> exprPlans = (List<LogicalExpressionPlan>) cg.getExpressionPlans().get(0);
+ POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+ physOp.setAlias(cg.getAlias());
+ List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
+
+ try {
+ physOp.setPlans(pExprPlans);
+ } catch (PlanException pe) {
+ int errCode = 2071;
+ String msg = "Problem with setting up map group's plans.";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+ }
+ Byte type = null;
+ if (exprPlans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(logToPhyMap.get(pred), physOp);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ logToPhyMap.put(cg, physOp);
+ }
+
+ @Override
public void visitLOJoin(LOJoin loj) throws IOException {
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Join");
@@ -448,23 +579,6 @@ public class LogToPhyTranslationVisitor
// Convert the expression plan into physical Plan
List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
-// currentPlans.push(currentPlan);
-// for (LogicalExpressionPlan lp : plans) {
-// currentPlan = new PhysicalPlan();
-//
-// // We spawn a new Dependency Walker and use it
-// PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
-// pushWalker(childWalker);
-// // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
-// currentWalker.walk(
-// new ExpToPhyTranslationVisitor(currentWalker.getPlan(),
-// childWalker) );
-//
-// exprPlans.add(currentPlan);
-// popWalker();
-// }
-// currentPlan = currentPlans.pop();
-
ppLists.add(exprPlans);
joinPlans.put(physOp, exprPlans);
@@ -745,6 +859,128 @@ public class LogToPhyTranslationVisitor
// System.err.println("Exiting Join");
}
+ @Override
+ public void visitLOUnion(LOUnion loUnion) throws IOException {
+ String scope = DEFAULT_SCOPE;
+ POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam());
+ physOp.setAlias(loUnion.getAlias());
+ currentPlan.add(physOp);
+ physOp.setResultType(DataType.BAG);
+ logToPhyMap.put(loUnion, physOp);
+ List<Operator> ops = plan.getPredecessors(loUnion);
+
+ for (Operator l : ops) {
+ PhysicalOperator from = logToPhyMap.get(l);
+ try {
+ currentPlan.connect(from, physOp);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+
+ @Override
+ public void visitLOSplit(LOSplit loSplit) throws IOException {
+ String scope = DEFAULT_SCOPE;
+ POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), loSplit.getRequestedParallelisam());
+ physOp.setAlias(loSplit.getAlias());
+ FileSpec splStrFile;
+ try {
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ } catch (IOException e1) {
+ byte errSrc = pc.getErrorSource();
+ int errCode = 0;
+ switch(errSrc) {
+ case PigException.BUG:
+ errCode = 2016;
+ break;
+ case PigException.REMOTE_ENVIRONMENT:
+ errCode = 6002;
+ break;
+ case PigException.USER_ENVIRONMENT:
+ errCode = 4003;
+ break;
+ }
+ String msg = "Unable to obtain a temporary path." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, errSrc, e1);
+
+ }
+ physOp.setSplitStore(splStrFile);
+ logToPhyMap.put(loSplit, physOp);
+
+ currentPlan.add(physOp);
+
+ List<Operator> op = plan.getPredecessors(loSplit);
+ PhysicalOperator from;
+
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ } else {
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Split." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ try {
+ currentPlan.connect(from, physOp);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitLOSplitOutput(LOSplitOutput loSplitOutput) throws IOException {
+ String scope = DEFAULT_SCOPE;
+// System.err.println("Entering Filter");
+ POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam());
+ poFilter.setAlias(loSplitOutput.getAlias());
+ poFilter.setResultType(DataType.BAG);
+ currentPlan.add(poFilter);
+ logToPhyMap.put(loSplitOutput, poFilter);
+ currentPlans.push(currentPlan);
+
+ currentPlan = new PhysicalPlan();
+
+// PlanWalker childWalker = currentWalker
+// .spawnChildWalker(filter.getFilterPlan());
+ PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan());
+ pushWalker(childWalker);
+ //currentWalker.walk(this);
+ currentWalker.walk(
+ new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
+ childWalker, loSplitOutput, currentPlan, logToPhyMap ) );
+ popWalker();
+
+ poFilter.setPlan(currentPlan);
+ currentPlan = currentPlans.pop();
+
+ List<Operator> op = loSplitOutput.getPlan().getPredecessors(loSplitOutput);
+
+ PhysicalOperator from;
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ } else {
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Filter." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ try {
+ currentPlan.connect(from, poFilter);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+// System.err.println("Exiting Filter");
+ }
+
/**
* updates plan with check for empty bag and if bag is empty to flatten a bag
* with as many null's as dictated by the schema
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Mon Mar 15 03:28:27 2010
@@ -18,6 +18,10 @@
package org.apache.pig.experimental.logical.relational;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
import org.apache.pig.experimental.plan.BaseOperatorPlan;
import org.apache.pig.experimental.plan.OperatorPlan;
@@ -50,5 +54,4 @@ public class LogicalPlan extends BaseOpe
return super.isEqual(other);
}
-
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Mon Mar 15 03:28:27 2010
@@ -62,4 +62,16 @@ public abstract class LogicalPlanVisitor
public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
}
+
+ public void visitLOCogroup(LOCogroup loCogroup) throws IOException {
+ }
+
+ public void visitLOSplit(LOSplit loSplit) throws IOException {
+ }
+
+ public void visitLOSplitOutput(LOSplitOutput loSplitOutput) throws IOException {
+ }
+
+ public void visitLOUnion(LOUnion loUnion) throws IOException {
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Mon Mar 15 03:28:27 2010
@@ -124,7 +124,7 @@ abstract public class LogicalRelationalO
* is defined here as having equal schemas and predecessors that are equal.
* This is intended to be used by operators' equals methods.
* @param other LogicalRelationalOperator to compare predecessors against
- * @return true if the equals() methods of this node's predecessor(s) returns
+ * @return true if the isEquals() methods of this node's predecessor(s) returns
* true when invoked with other's predecessor(s).
*/
protected boolean checkEquality(LogicalRelationalOperator other) {
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Mon Mar 15 03:28:27 2010
@@ -63,7 +63,7 @@ public class LogicalSchema {
return false;
}
}
-
+
public String toString() {
if( type == DataType.BAG ) {
if( schema == null ) {
@@ -182,6 +182,32 @@ public class LogicalSchema {
}
/**
+ * Look for the index of the field that contains the specified uid
+ * @param uid the uid to look for
+ * @return the index of the field, -1 if not found
+ */
+ public int findField(long uid) {
+
+ for(int i=0; i< size(); i++) {
+ LogicalFieldSchema f = getField(i);
+ // if this field has the same uid, then return this field
+ if (f.uid == uid) {
+ return i;
+ }
+
+ // if this field has a schema, check its schema
+ if (f.schema != null) {
+ if (f.schema.findField(uid) != -1) {
+ return i;
+ }
+ }
+ }
+
+ return -1;
+ }
+
+
+ /**
* Merge two schemas.
* @param s1
* @param s2
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+
+public class SchemaNotDefinedException extends RuntimeException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public SchemaNotDefinedException(Throwable e) {
+ super(e);
+ }
+
+ public SchemaNotDefinedException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public SchemaNotDefinedException(String msg) {
+ super(msg);
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class AddForEach extends WholePlanRule {
+ protected static final String REQUIREDCOLS = "AddForEach:RequiredColumns";
+
+ public AddForEach(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new AddForEachTransformer();
+ }
+
+ public class AddForEachTransformer extends Transformer {
+ LogicalRelationalOperator opForAdd;
+ OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ Iterator<Operator> iter = matched.getOperators();
+ while(iter.hasNext()) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+ if (shouldAdd(op)) {
+ opForAdd = op;
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void addSuccessors(Operator op) throws IOException {
+ subPlan.add(op);
+ List<Operator> ll = op.getPlan().getSuccessors(op);
+ if (ll != null) {
+ for(Operator suc: ll) {
+ addSuccessors(suc);
+ }
+ }
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ addForeach(opForAdd);
+
+ subPlan = new OperatorSubPlan(currentPlan);
+ addSuccessors(opForAdd);
+ }
+
+ @SuppressWarnings("unchecked")
+ // check if an LOForEach should be added after the logical operator
+ private boolean shouldAdd(LogicalRelationalOperator op) throws IOException {
+ if (op instanceof LOForEach) {
+ return false;
+ }
+ Set<Long> output = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+
+ if (output == null) {
+ return false;
+ }
+
+ LogicalSchema s = op.getSchema();
+ if (s == null) {
+ return false;
+ }
+
+ // check if there is already a foreach
+ List<Operator> ll = op.getPlan().getSuccessors(op);
+ if (ll != null && ll.get(0) instanceof LOForEach) {
+ return false;
+ }
+
+ Set<Integer> cols = new HashSet<Integer>();
+ for(long uid: output) {
+ int col = s.findField(uid);
+ if (col < 0) {
+ throw new RuntimeException("Uid " + uid + " is not in the schema of " + op.getName());
+ }
+ cols.add(col);
+ }
+
+ if (cols.size()<s.size()) {
+ op.annotate(REQUIREDCOLS, cols);
+ return true;
+ }
+
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addForeach(LogicalRelationalOperator op) throws IOException {
+ LOForEach foreach = new LOForEach(op.getPlan());
+
+ // add foreach to the base plan
+ LogicalPlan p = (LogicalPlan)op.getPlan();
+ p.add(foreach);
+ List<Operator> next = p.getSuccessors(op);
+ if (next != null) {
+ Operator[] nextArray = next.toArray(new Operator[0]);
+ for(Operator n: nextArray) {
+ Pair<Integer, Integer> pos = p.disconnect(op, n);
+ p.connect(foreach, pos.first, n, pos.second);
+ }
+ }
+
+ p.connect(op, foreach);
+
+ LogicalPlan innerPlan = new LogicalPlan();
+ foreach.setInnerPlan(innerPlan);
+
+ // get output columns
+ Set<Integer> cols = (Set<Integer>)op.getAnnotation(REQUIREDCOLS);
+
+ // build foreach inner plan
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[cols.size()]);
+ innerPlan.add(gen);
+
+ LogicalSchema schema = op.getSchema();
+ for (int i=0, j=0; i<schema.size(); i++) {
+ if (!cols.contains(i)) {
+ continue;
+ }
+
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
+ innerLoad.getProjection().setUid(foreach);
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+ ProjectExpression prj = new ProjectExpression(exp, schema.getField(i).type, j++, 0);
+ prj.setUid(gen);
+ exp.add(prj);
+ exps.add(exp);
+ }
+
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOCogroup;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This Rule prunes columns and map keys and set to loader. This rule depends
+ * on MapKeysPruneHelper to calculate what keys are required for a loader,
+ * and ColumnPruneHelper to calculate the required columns for a loader. Then
+ * it combines the map keys and columns info to set into the loader.
+ */
+public class ColumnMapKeyPrune extends WholePlanRule {
+ private boolean hasRun;
+
+ public ColumnMapKeyPrune(String n) {
+ super(n);
+ hasRun = false;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new ColumnMapKeyPruneTransformer();
+ }
+
+ public class ColumnMapKeyPruneTransformer extends Transformer {
+ private MapKeysPruneHelper mapKeyHelper;
+ private ColumnPruneHelper columnHelper;
+ private boolean columnPrune;
+ private boolean mapKeyPrune;
+
+ /*
+ * This is a map of of required columns and map keys for each LOLoad
+ * RequiredMapKeys --> Map<Integer, Set<String> >
+ * RequiredColumns --> Set<Integer>
+ *
+ * The integer are column indexes.
+ */
+ private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems =
+ new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+
+ private OperatorSubPlan subPlan = null;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ // only run this rule once
+ if (hasRun) {
+ return false;
+ }
+
+ hasRun = true;
+ mapKeyHelper = new MapKeysPruneHelper(matched);
+ columnHelper = new ColumnPruneHelper(matched);
+
+ // check if map keys can be pruned
+ mapKeyPrune = mapKeyHelper.check();
+ // check if columns can be pruned
+ columnPrune = columnHelper.check();
+
+ return mapKeyPrune || columnPrune;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void merge() {
+ // combine two subplans
+ subPlan = new OperatorSubPlan(currentPlan);
+ if (columnPrune) {
+ Iterator<Operator> iter = columnHelper.reportChanges().getOperators();
+ while(iter.hasNext()) {
+ subPlan.add(iter.next());
+ }
+ }
+
+ if (mapKeyPrune) {
+ Iterator<Operator> iter = mapKeyHelper.reportChanges().getOperators();
+ while(iter.hasNext()) {
+ subPlan.add(iter.next());
+ }
+ }
+
+ // combine annotations
+ for( Operator source : currentPlan.getSources() ) {
+ Map<Integer,Set<String>> mapKeys =
+ (Map<Integer, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+ Set<Integer> requiredColumns = null;
+ if (source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS) != null) {
+ requiredColumns = new HashSet<Integer>((Set<Integer>) source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS));
+ }
+
+ // We dont have any information so skip
+ if( requiredColumns == null && mapKeys == null ) {
+ continue;
+ }
+
+ if( requiredColumns != null && mapKeys != null ) {
+
+ Set<Integer> duplicatedCols = new HashSet<Integer>();
+
+ // Remove the columns already marked by MapKeys
+ for( Integer col : requiredColumns ) {
+ if( mapKeys.containsKey(col) ) {
+ duplicatedCols.add(col);
+ }
+ }
+ requiredColumns.removeAll(duplicatedCols);
+ } else if ( mapKeys != null && requiredColumns == null ) {
+ // This is the case where only mapKeys can be pruned. And none
+ // of the columns can be pruned. So we add all columns to the
+ // requiredcolumns part
+ requiredColumns = new HashSet<Integer>();
+ for(int i = 0; i < ((LogicalRelationalOperator)source).getSchema().size(); i++ ) {
+ if( !mapKeys.containsKey(i) ) {
+ requiredColumns.add(i);
+ }
+ }
+ }
+
+ requiredItems.put((LOLoad) source, new Pair<Map<Integer,Set<String>>,Set<Integer>>(mapKeys, requiredColumns));
+ }
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ merge();
+
+ ColumnPruneVisitor v = new ColumnPruneVisitor(subPlan);
+ v.visit();
+ }
+
+
+ // visitor to change the plan to remove unnecessary columns
+ private class ColumnPruneVisitor extends LogicalPlanVisitor {
+ public ColumnPruneVisitor(OperatorPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ }
+
+ public void visitLOLoad(LOLoad load) throws IOException {
+ if(! requiredItems.containsKey( load ) ) {
+ return;
+ }
+
+ Pair<Map<Integer,Set<String>>,Set<Integer>> required =
+ requiredItems.get(load);
+
+ RequiredFieldList requiredFields = new RequiredFieldList();
+
+ LogicalSchema s = load.getSchema();
+ for (int i=0;i<s.size();i++) {
+ RequiredField requiredField = null;
+ // As we have done processing ahead, we assume that
+ // a column is not present in both ColumnPruner and
+ // MapPruner
+ if( required.first != null && required.first.containsKey(i) ) {
+ requiredField = new RequiredField();
+ requiredField.setIndex(i);
+ requiredField.setType(s.getField(i).type);
+ List<RequiredField> subFields = new ArrayList<RequiredField>();
+ for( String key : required.first.get(i) ) {
+ RequiredField subField = new RequiredField(key,-1,null,DataType.BYTEARRAY);
+ subFields.add(subField);
+ }
+ requiredField.setSubFields(subFields);
+ requiredFields.add(requiredField);
+ }
+ if( required.second != null && required.second.contains(i) ) {
+ requiredField = new RequiredField();
+ requiredField.setIndex(i);
+ requiredField.setType(s.getField(i).type);
+ requiredFields.add(requiredField);
+ }
+ }
+
+ log.info("Loader for " + load.getAlias() + " is pruned. Load fields " + requiredFields);
+ for(RequiredField rf: requiredFields.getFields()) {
+ List<RequiredField> sub = rf.getSubFields();
+ if (sub != null) {
+ // log.info("For column " + rf.getIndex() + ", set map keys: " + sub.toString());
+ log.info("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub);
+ }
+ }
+
+ LoadPushDown.RequiredFieldResponse response = null;
+ try {
+ LoadFunc loadFunc = load.getLoadFunc();
+ if (loadFunc instanceof LoadPushDown) {
+ response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
+ }
+
+ } catch (FrontendException e) {
+ log.warn("pushProjection on "+load+" throw an exception, skip it");
+ }
+
+ // Loader does not support column pruning, insert foreach
+ if (columnPrune) {
+ if (response==null || !response.getRequiredFieldResponse()) {
+ LogicalPlan p = (LogicalPlan)load.getPlan();
+ Operator next = p.getSuccessors(load).get(0);
+ // if there is already a LOForEach after load, we don't need to
+ // add another LOForEach
+ if (next instanceof LOForEach) {
+ return;
+ }
+
+ LOForEach foreach = new LOForEach(load.getPlan());
+
+ // add foreach to the base plan
+ p.add(foreach);
+
+ Pair<Integer,Integer> disconnectedPos = p.disconnect(load, next);
+ p.connect(load, disconnectedPos.first.intValue(), foreach, 0 );
+ p.connect(foreach, 0, next, disconnectedPos.second.intValue());
+
+ // add foreach to the subplan
+ subPlan.add(foreach);
+
+ LogicalPlan innerPlan = new LogicalPlan();
+ foreach.setInnerPlan(innerPlan);
+
+ // build foreach inner plan
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
+ innerPlan.add(gen);
+
+ for (int i=0; i<requiredFields.getFields().size(); i++) {
+ LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
+ innerLoad.getProjection().setUid(foreach);
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+ ProjectExpression prj = new ProjectExpression(exp, rf.getType(), i, 0);
+ prj.setUid(gen);
+ exp.add(prj);
+ exps.add(exp);
+ }
+
+ } else {
+ // columns are pruned, reset schema for LOLoader
+ LogicalSchema newSchema = new LogicalSchema();
+ List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
+ for (int i=0; i<fieldList.size(); i++) {
+ newSchema.addField(s.getField(fieldList.get(i).getIndex()));
+ }
+
+ load.setScriptSchema(newSchema);
+ }
+ }
+ }
+
+
+ public void visitLOFilter(LOFilter filter) throws IOException {
+ }
+
+ public void visitLOStore(LOStore store) throws IOException {
+ }
+
+ public void visitLOCogroup( LOCogroup cg ) throws IOException {
+ }
+
+ public void visitLOJoin(LOJoin join) throws IOException {
+ }
+
+ @SuppressWarnings("unchecked")
+ public void visitLOForEach(LOForEach foreach) throws IOException {
+ if (!columnPrune) {
+ return;
+ }
+
+ // get column numbers from input uids
+ Set<Long> input = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
+ LogicalRelationalOperator op = (LogicalRelationalOperator)foreach.getPlan().getPredecessors(foreach).get(0);
+ Set<Integer> cols = columnHelper.getColumns(op.getSchema(), input);
+
+ LogicalPlan innerPlan = foreach.getInnerPlan();
+ LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+
+ // clean up the predecessors of LOGenerate
+ List<Operator> ll = innerPlan.getPredecessors(gen);
+ List<Operator> toRemove = new ArrayList<Operator>();
+ for(int i=0; i<ll.size(); i++) {
+
+ // if the LOInnerLoads for this subplan are all in the column set,
+ // no change required, keep going
+ if (checkInnerLoads((LogicalRelationalOperator)ll.get(i), cols)) {
+ continue;
+ }
+
+ // clean up and adjust expression plans
+ Iterator<LogicalExpressionPlan> iter = gen.getOutputPlans().iterator();
+ int j=-1;
+ while(iter.hasNext()) {
+ j++;
+ LogicalExpressionPlan exp = iter.next();
+ List<Operator> sinks = exp.getSinks();
+ for(Operator s: sinks) {
+ if (s instanceof ProjectExpression) {
+ int inputNo = ((ProjectExpression)s).getInputNum();
+ if (inputNo + toRemove.size() == i) {
+ // if this expression has this input that is to be removed,
+ // then remove this expression plan
+ iter.remove();
+
+ // adjust flatten flags
+ boolean[] flatten = gen.getFlattenFlags();
+ for(int k=j; k<flatten.length-1; k++) {
+ flatten[k] = flatten[k+1];
+ }
+ break;
+ } else if (inputNo + toRemove.size() > i) {
+ // adjust input number for all projections whose
+ // input number is after the one to be removed
+ ((ProjectExpression)s).setInputNum(inputNo-1);
+ }
+ }
+ }
+ }
+
+ // this LOInnerLoad and its successors should be removed, add to the remove list
+ toRemove.add(ll.get(i));
+
+ }
+
+ for(Operator pred: toRemove) {
+ removeSubTree((LogicalRelationalOperator)pred);
+ }
+
+ // trim the flatten flags in case some expressions are removed
+ boolean[] flatten = new boolean[gen.getOutputPlans().size()];
+ System.arraycopy(gen.getFlattenFlags(), 0, flatten, 0, flatten.length);
+ gen.setFlattenFlags(flatten);
+ }
+
+ public void visitLOGenerate(LOGenerate gen) throws IOException {
+ }
+
+ public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+ }
+
+ // check if the column number in LOInnerLoad is inside a given column index set
+ protected boolean checkInnerLoads(LogicalRelationalOperator op, Set<Integer> cols) throws IOException {
+ if (op instanceof LOInnerLoad) {
+ int col = ((LOInnerLoad)op).getColNum();
+ if (!cols.contains(col)) {
+ return false;
+ }
+ }
+
+ List<Operator> preds = op.getPlan().getPredecessors(op);
+ if (preds != null) {
+ for(Operator pred: preds ) {
+ if (!checkInnerLoads((LogicalRelationalOperator)pred, cols)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ // remove all the operators starting from an operator
+ protected void removeSubTree(LogicalRelationalOperator op) throws IOException {
+ LogicalPlan p = (LogicalPlan)op.getPlan();
+ List<Operator> ll = p.getPredecessors(op);
+ if (ll != null) {
+ for(Operator pred: ll) {
+ removeSubTree((LogicalRelationalOperator)pred);
+ }
+ }
+
+ if (p.getSuccessors(op) != null) {
+ Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
+
+ for(Operator s: succs) {
+ p.disconnect(op, s);
+ }
+ }
+
+ p.remove(op);
+ }
+
+ }
+ }
+
+}