You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC
svn commit: r982345 [5/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * Operator to map the data into the inner plan of LOForEach
+ * It can only be used in the inner plan of LOForEach
+ *
+ */
+public class LOInnerLoad extends LogicalRelationalOperator {
+ private ProjectExpression prj;
+ private LOForEach foreach;
+ private boolean sourceIsBag = false;
+
+ public LOInnerLoad(OperatorPlan plan, LOForEach foreach, int colNum) {
+ super("LOInnerLoad", plan);
+
+ // store column number as a ProjectExpression in a plan
+ // to be able to dynamically adjust column number during optimization
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ // we don't care about type, so set to -1
+ prj = new ProjectExpression(exp, 0, colNum, foreach);
+ this.foreach = foreach;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ if (schema!=null)
+ return schema;
+
+ try {
+ if (prj.getFieldSchema()!=null) {
+ schema = new LogicalSchema();
+ if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
+ LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
+ for (int i=0;i<tupleSchema.schema.size();i++)
+ schema.addField(tupleSchema.schema.getField(i));
+ sourceIsBag = true;
+ alias = prj.getFieldSchema().alias;
+ }
+ else if (prj.getFieldSchema().type==DataType.BAG){
+ for (int i=0;i<prj.getFieldSchema().schema.size();i++)
+ schema.addField(prj.getFieldSchema().schema.getField(i));
+ sourceIsBag = true;
+ alias = prj.getFieldSchema().alias;
+ }
+ else {
+ schema.addField(prj.getFieldSchema());
+ }
+ }
+ } catch (IOException e) {
+ // TODO
+ }
+ return schema;
+ }
+
+ public ProjectExpression getProjection() {
+ return prj;
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (!(other instanceof LOInnerLoad)) {
+ return false;
+ }
+
+ return (getColNum() == ((LOInnerLoad)other).getColNum());
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ public int getColNum() {
+ return prj.getColNum();
+ }
+
+ /**
+ * Get the LOForEach operator that contains this operator as part of inner plan
+ * @return the LOForEach operator
+ */
+ public LOForEach getLOForEach() {
+ return foreach;
+ }
+
+ public boolean sourceIsBag() {
+ return sourceIsBag;
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+
+public class LOJoin extends LogicalRelationalOperator {
+ private static final long serialVersionUID = 2L;
+
+ /**
+ * Enum for the type of join
+ */
+ public static enum JOINTYPE {
+ HASH, // Hash Join
+ REPLICATED, // Fragment Replicated join
+ SKEWED, // Skewed Join
+ MERGE // Sort Merge Join
+ };
+
+
+ /**
+ * LOJoin contains a list of logical operators corresponding to the
+ * relational operators and a list of generates for each relational
+ * operator. Each generate operator in turn contains a list of expressions
+ * for the columns that are projected
+ */
+ //private static Log log = LogFactory.getLog(LOJoin.class);
+ // expression plans for each input.
+ private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
+ // indicator for each input whether it is inner
+ private boolean[] mInnerFlags;
+ private JOINTYPE mJoinType; // Retains the type of the join
+
+ public LOJoin(LogicalPlan plan) {
+ super("LOJoin", plan);
+ }
+
+ public LOJoin(LogicalPlan plan,
+ MultiMap<Integer, LogicalExpressionPlan> joinPlans,
+ JOINTYPE jt,
+ boolean[] isInner) {
+ super("LOJoin", plan);
+ mJoinPlans = joinPlans;
+ mJoinType = jt;
+ mInnerFlags = isInner;
+ }
+
+ public boolean isInner(int inputIndex) {
+ return mInnerFlags[inputIndex];
+ }
+
+ public boolean[] getInnerFlags() {
+ return mInnerFlags;
+ }
+
+ public JOINTYPE getJoinType() {
+ return mJoinType;
+ }
+
+ public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
+ return mJoinPlans.get(inputIndex);
+ }
+
+ /**
+ * Get all of the expressions plans that are in this join.
+ * @return collection of all expression plans.
+ */
+ public Collection<LogicalExpressionPlan> getExpressionPlans() {
+ return mJoinPlans.values();
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ // if schema is calculated before, just return
+ if (schema != null) {
+ return schema;
+ }
+
+ List<Operator> inputs = null;
+ try {
+ inputs = plan.getPredecessors(this);
+ if (inputs == null) {
+ return null;
+ }
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessors of LOJoin operator. ", e);
+ }
+
+ List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+
+ for (Operator op : inputs) {
+ LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+ // the schema of one input is unknown, so the join schema is unknown, just return
+ if (inputSchema == null) {
+ schema = null;
+ return schema;
+ }
+
+ for (int i=0; i<inputSchema.size(); i++) {
+ LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+ LogicalSchema.LogicalFieldSchema newFS = null;
+ if(fs.alias != null) {
+ newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
+ } else {
+ newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+ }
+ fss.add(newFS);
+ }
+ }
+
+ schema = new LogicalSchema();
+ for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+ schema.addField(fieldSchema);
+ }
+
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOJoin) {
+ LOJoin oj = (LOJoin)other;
+ if (mJoinType != oj.mJoinType) return false;
+ if (mInnerFlags.length != oj.mInnerFlags.length) return false;
+ for (int i = 0; i < mInnerFlags.length; i++) {
+ if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
+ }
+ if (!checkEquality(oj)) return false;
+
+ if (mJoinPlans.size() != oj.mJoinPlans.size()) return false;
+
+ // Now, we need to make sure that for each input we are projecting
+ // the same columns. This is slightly complicated since MultiMap
+ // doesn't return any particular order, so we have to find the
+ // matching input in each case.
+ for (Integer p : mJoinPlans.keySet()) {
+ Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
+ int op = -1;
+ while (iter.hasNext()) {
+ op = iter.next();
+ if (p.equals(op)) break;
+ }
+ if (op != -1) {
+ Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
+ Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
+ if (c.size() != oc.size()) return false;
+
+ if (!(c instanceof List) || !(oc instanceof List)) {
+ throw new RuntimeException(
+ "Expected list of expression plans");
+ }
+ List<LogicalExpressionPlan> elist = (List<LogicalExpressionPlan>)c;
+ List<LogicalExpressionPlan> oelist = (List<LogicalExpressionPlan>)oc;
+ for (int i = 0; i < elist.size(); i++) {
+ if (!elist.get(i).isEqual(oelist.get(i))) return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOLimit extends LogicalRelationalOperator {
+
+ private long mLimit;
+
+ private static final long serialVersionUID = 2L;
+ //private static Log log = LogFactory.getLog(LOFilter.class);
+
+
+ public LOLimit(LogicalPlan plan, long limit) {
+ super("LOLimit", plan);
+ mLimit = limit;
+ }
+
+ public long getLimit() {
+ return mLimit;
+ }
+
+ public void setLimit(long limit) {
+ mLimit = limit;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOLimit.", e);
+ }
+
+ return input.getSchema();
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOLimit && ((LOLimit)other).getLimit() == mLimit)
+ return checkEquality((LogicalRelationalOperator)other);
+ else
+ return false;
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOLoad extends LogicalRelationalOperator {
+
+ private LogicalSchema scriptSchema;
+ private FileSpec fs;
+ private transient LoadFunc loadFunc;
+ transient private Configuration conf;
+ private LogicalSchema determinedSchema;
+ private List<Integer> requiredFields = null;
+ private boolean castInserted = false;
+ private LogicalSchema uidOnlySchema;
+
+ /**
+ *
+ * @param loader FuncSpec for load function to use for this load.
+ * @param schema schema user specified in script, or null if not
+ * specified.
+ * @param plan logical plan this load is part of.
+ */
+ public LOLoad(FileSpec loader, LogicalSchema schema, LogicalPlan plan, Configuration conf) {
+ super("LOLoad", plan);
+ scriptSchema = schema;
+ fs = loader;
+ this.conf = conf;
+ }
+
+ public LoadFunc getLoadFunc() {
+ try {
+ if (loadFunc == null) {
+ loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+ loadFunc.setUDFContextSignature(getAlias());
+ }
+
+ return loadFunc;
+ }catch (ClassCastException cce) {
+ throw new RuntimeException(fs.getFuncSpec() + " should implement the LoadFunc interface.");
+ }
+ }
+
+ public void setScriptSchema(LogicalSchema schema) {
+ scriptSchema = schema;
+ }
+
+ public void setRequiredFields(List<Integer> requiredFields) {
+ this.requiredFields = requiredFields;
+ }
+
+ /**
+ * Get the schema for this load. The schema will be either be what was
+ * given by the user in the script or what the load functions getSchema
+ * call returned. Otherwise null will be returned, indicating that the
+ * schema is unknown.
+ * @return schema, or null if unknown
+ */
+ @Override
+ public LogicalSchema getSchema() {
+ if (schema != null)
+ return schema;
+
+ LogicalSchema originalSchema = null;
+ // TODO get schema from LoaderMetadata interface.
+ if (determinedSchema!=null) {
+ determinedSchema = getSchemaFromMetaData();
+ }
+
+ if (scriptSchema != null && determinedSchema != null) {
+ originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema);
+ } else if (scriptSchema != null) originalSchema = scriptSchema;
+ else if (determinedSchema != null) originalSchema = determinedSchema;
+
+ if (isCastInserted()) {
+ for (int i=0;i<originalSchema.size();i++) {
+ LogicalSchema.LogicalFieldSchema fs = originalSchema.getField(i);
+ if(determinedSchema == null) {
+ // Reset the loads field schema to byte array so that it
+ // will reflect reality.
+ fs.type = DataType.BYTEARRAY;
+ } else {
+ // Reset the type to what determinedSchema says it is
+ fs.type = determinedSchema.getField(i).type;
+ }
+ }
+ }
+
+ if (originalSchema!=null) {
+ try {
+ uidOnlySchema = originalSchema.mergeUid(uidOnlySchema);
+ } catch (IOException e) {
+ //TODO Exception
+ }
+ }
+
+ if (requiredFields!=null) {
+ schema = new LogicalSchema();
+ for (int i=0;i<originalSchema.size();i++) {
+ if (requiredFields.contains(i))
+ schema.addField(originalSchema.getField(i));
+ }
+ } else
+ schema = originalSchema;
+
+ return schema;
+ }
+
+ private LogicalSchema getSchemaFromMetaData() {
+ return null;
+ }
+
+ public FileSpec getFileSpec() {
+ return fs;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+
+ }
+
+ public LogicalSchema getDeterminedSchema() {
+ return determinedSchema;
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOLoad) {
+ LOLoad ol = (LOLoad)other;
+ if (!checkEquality(ol)) return false;
+ if (fs == null) {
+ if (ol.fs == null) {
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ return fs.equals(ol.fs);
+ } else {
+ return false;
+ }
+ }
+
+ public void setCastInserted(boolean flag) {
+ castInserted = flag;
+ }
+
+ public boolean isCastInserted() {
+ return castInserted;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.SortColInfo;
+import org.apache.pig.SortInfo;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+
+public class LOSort extends LogicalRelationalOperator{
+ private List<Boolean> mAscCols;
+ private FuncSpec mSortFunc;
+ private boolean mIsStar = false;
+ private long limit;
+ private List<LogicalExpressionPlan> mSortColPlans;
+
+ public LOSort(OperatorPlan plan, List<LogicalExpressionPlan> sortColPlans,
+ List<Boolean> ascCols,
+ FuncSpec sortFunc ) {
+ super("LOSort", plan);
+ mSortColPlans = sortColPlans;
+ mAscCols = ascCols;
+ mSortFunc = sortFunc;
+ limit = -1;
+ }
+
+ public List<LogicalExpressionPlan> getSortColPlans() {
+ return mSortColPlans;
+ }
+
+ public void setSortColPlans(List<LogicalExpressionPlan> sortPlans) {
+ mSortColPlans = sortPlans;
+ }
+
+ public List<Boolean> getAscendingCols() {
+ return mAscCols;
+ }
+
+ public void setAscendingCols(List<Boolean> ascCols) {
+ mAscCols = ascCols;
+ }
+
+ public FuncSpec getUserFunc() {
+ return mSortFunc;
+ }
+
+ public void setUserFunc(FuncSpec func) {
+ mSortFunc = func;
+ }
+
+ public boolean isStar() {
+ return mIsStar;
+ }
+
+ public void setStar(boolean b) {
+ mIsStar = b;
+ }
+
+ public void setLimit(long l)
+ {
+ limit = l;
+ }
+
+ public long getLimit()
+ {
+ return limit;
+ }
+
+ public boolean isLimited()
+ {
+ return (limit!=-1);
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to get predecessor of LOSort.", e);
+ }
+ return input.getSchema();
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+
+ }
+
+ public SortInfo getSortInfo() throws IOException {
+ LogicalSchema schema = this.getSchema();
+ List<SortColInfo> sortColInfoList = new ArrayList<SortColInfo>();
+ for (int i = 0; i < mSortColPlans.size(); i++) {
+ LogicalExpressionPlan lp = mSortColPlans.get(i);
+ Iterator<Operator> opsIterator = lp.getOperators();
+ List<Operator> opsList = new ArrayList<Operator>();
+ while(opsIterator.hasNext()) {
+ opsList.add(opsIterator.next());
+ }
+ if(opsList.size() != 1 || !(opsList.get(0) instanceof ProjectExpression)) {
+ throw new IOException("Unsupported operator in inner plan: " + opsList.get(0));
+ }
+ ProjectExpression project = (ProjectExpression) opsList.get(0);
+ int sortColIndex = project.getColNum();
+ String sortColName = (schema == null) ? null :
+ schema.getField(sortColIndex).alias;
+ sortColInfoList.add(new SortColInfo(sortColName, sortColIndex,
+ mAscCols.get(i)? SortColInfo.Order.ASCENDING :
+ SortColInfo.Order.DESCENDING));
+ }
+ return new SortInfo(sortColInfoList);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOSort) {
+ LOSort otherSort = (LOSort)other;
+ if (!mAscCols.equals(otherSort.getAscendingCols()))
+ return false;
+ if (mSortFunc.equals(otherSort.getUserFunc()))
+ return false;
+ if (mIsStar!=otherSort.isStar())
+ return false;
+ if (limit!=otherSort.getLimit())
+ return false;
+ if (mSortColPlans.equals(otherSort.getSortColPlans()))
+ return false;
+ }
+ return checkEquality((LogicalRelationalOperator)other);
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOSplit extends LogicalRelationalOperator {
+
+ public LOSplit(OperatorPlan plan) {
+ super("LOSplit", plan);
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+ }
+
+ return input.getSchema();
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOSplit) {
+ return checkEquality((LOSplit)other);
+ } else {
+ return false;
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+public class LOSplitOutput extends LogicalRelationalOperator {
+ private LogicalExpressionPlan filterPlan;
+ public LOSplitOutput(LogicalPlan plan) {
+ super("LOSplitOutput", plan);
+ }
+
+ public LOSplitOutput(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+ super("LOSplitOutput", plan);
+ this.filterPlan = filterPlan;
+ }
+
+ public LogicalExpressionPlan getFilterPlan() {
+ return filterPlan;
+ }
+
+ public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+ this.filterPlan = filterPlan;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+ }
+
+ return input.getSchema();
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOSplitOutput) {
+ LOSplitOutput os = (LOSplitOutput)other;
+ return filterPlan.isEqual(os.filterPlan) && checkEquality(os);
+ } else {
+ return false;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOStore extends LogicalRelationalOperator {
+ private static final long serialVersionUID = 2L;
+
+ private FileSpec output;
+ transient private StoreFuncInterface storeFunc;
+
+ //private static Log log = LogFactory.getLog(LOStore.class);
+
+ public LOStore(LogicalPlan plan) {
+ super("LOStore", plan);
+ }
+
+ public LOStore(LogicalPlan plan, FileSpec outputFileSpec) {
+ super("LOStore", plan);
+
+ output = outputFileSpec;
+
+ try {
+ storeFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate StoreFunc.", e);
+ }
+ }
+
+ public FileSpec getOutputSpec() {
+ return output;
+ }
+
+ public StoreFuncInterface getStoreFunc() {
+ return storeFunc;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ try {
+ schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOStore.", e);
+ }
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOStore) {
+ LOStore os = (LOStore)other;
+ if (!checkEquality(os)) return false;
+ // No need to test that storeFunc is equal, since it's
+ // being instantiated from output
+ if (output == null && os.output == null) {
+ return true;
+ } else if (output == null || os.output == null) {
+ return false;
+ } else {
+ return output.equals(os.output);
+ }
+ } else {
+ return false;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOStream extends LogicalRelationalOperator {
+
+ private static final long serialVersionUID = 2L;
+ //private static Log log = LogFactory.getLog(LOFilter.class);
+
+ // the StreamingCommand object for the
+ // Stream Operator this operator represents
+ private StreamingCommand command;
+ transient private ExecutableManager executableManager;
+
+ public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd) {
+ super("LODistinct", plan);
+ command = cmd;
+ executableManager = exeManager;
+ }
+
+ /**
+ * Get the StreamingCommand object associated
+ * with this operator
+ *
+ * @return the StreamingCommand object
+ */
+ public StreamingCommand getStreamingCommand() {
+ return command;
+ }
+
+ /**
+ * @return the ExecutableManager
+ */
+ public ExecutableManager getExecutableManager() {
+ return executableManager;
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ if (schema!=null)
+ return schema;
+ LogicalRelationalOperator input = null;
+ try {
+ input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOStream.", e);
+ }
+
+ schema = input.getSchema();
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOStream) {
+ return checkEquality((LogicalRelationalOperator)other);
+ } else {
+ return false;
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+
+public class LOUnion extends LogicalRelationalOperator {
+
+ // uid mapping from output uid to input uid
+ private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>();
+
+ public LOUnion(OperatorPlan plan) {
+ super("LOUnion", plan);
+ }
+ @Override
+ public LogicalSchema getSchema() {
+ if (schema != null) {
+ return schema;
+ }
+ List<Operator> inputs = null;
+ try {
+ inputs = plan.getPredecessors(this);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to get predecessor of LOUnion.", e);
+ }
+
+ // If any predecessor's schema is null, or length of predecessor's schema does not match,
+ // then the schema for union is null
+ int length = -1;
+ for (Operator input : inputs) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)input;
+ if (op.getSchema()==null)
+ return null;
+ if (length==-1)
+ length = op.getSchema().size();
+ else {
+ if (op.getSchema().size()!=length)
+ return null;
+ }
+ }
+
+ // Check if all predecessor's schema are compatible.
+ // TODO: Migrate all existing schema merging rules
+ LogicalSchema schema0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+ for (int i=1;i<inputs.size();i++) {
+ LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+ if (!schema0.isEqual(otherSchema))
+ return null;
+ }
+
+ // Generate merged schema based on schema of first input
+ schema = new LogicalSchema();
+ for (int i=0;i<schema0.size();i++)
+ {
+ LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(schema0.getField(i));
+ long uid = -1;
+ for (Pair<Long, Long> pair : uidMapping) {
+ if (pair.second==schema0.getField(i).uid) {
+ uid = pair.first;
+ break;
+ }
+ }
+ if (uid==-1) {
+ uid = LogicalExpression.getNextUid();
+ for (Operator input : inputs) {
+ long inputUid = ((LogicalRelationalOperator)input).getSchema().getField(i).uid;
+ uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+ }
+ }
+
+ fs.uid = uid;
+ schema.addField(fs);
+ }
+ return schema;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws IOException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new IOException("Expected LogicalPlanVisitor");
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator other) {
+ if (other != null && other instanceof LOUnion) {
+ return checkEquality((LOUnion)other);
+ } else {
+ return false;
+ }
+ }
+
+ // Get input uids mapping to the output uid
+ public Set<Long> getInputUids(long uid) {
+ Set<Long> result = new HashSet<Long>();
+ for (Pair<Long, Long> pair : uidMapping) {
+ if (pair.first==uid)
+ result.add(pair.second);
+ }
+ return result;
+ }
+}