You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [19/23] - in /hadoop/pig/branches/load-store-redesign:
./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebr...
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -51,35 +52,35 @@
public class POUserFunc extends ExpressionOperator {
- /**
+ /**
*
*/
private static final long serialVersionUID = 1L;
transient EvalFunc func;
-
- transient private final Log log = LogFactory.getLog(getClass());
- FuncSpec funcSpec;
+
+ transient private final Log log = LogFactory.getLog(getClass());
+ FuncSpec funcSpec;
FuncSpec origFSpec;
- public static final byte INITIAL = 0;
- public static final byte INTERMEDIATE = 1;
- public static final byte FINAL = 2;
- private boolean initialized = false;
-
- public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
- super(k, rp);
- inputs = inp;
+ public static final byte INITIAL = 0;
+ public static final byte INTERMEDIATE = 1;
+ public static final byte FINAL = 2;
+ private boolean initialized = false;
- }
+ public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp);
+ inputs = inp;
- public POUserFunc(
+ }
+
+ public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
FuncSpec funcSpec) {
- this(k, rp, inp, funcSpec, null);
- }
-
- public POUserFunc(
+ this(k, rp, inp, funcSpec, null);
+ }
+
+ public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
@@ -87,60 +88,60 @@
EvalFunc func) {
super(k, rp);
super.setInputs(inp);
- this.funcSpec = funcSpec;
+ this.funcSpec = funcSpec;
this.origFSpec = funcSpec;
- this.func = func;
+ this.func = func;
instantiateFunc(funcSpec);
- }
+ }
- private void instantiateFunc(FuncSpec fSpec) {
- this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
- //the next couple of initializations do not work as intended for the following reasons
- //the reporter and pigLogger are member variables of PhysicalOperator
- //when instanitateFunc is invoked at deserialization time, both
- //reporter and pigLogger are null. They are set during map and reduce calls,
- //making the initializations here basically useless. Look at the processInput
- //method where these variables are re-initialized. At that point, the PhysicalOperator
- //is set up correctly with the reporter and pigLogger references
+ private void instantiateFunc(FuncSpec fSpec) {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+ //the next couple of initializations do not work as intended for the following reasons
+ //the reporter and pigLogger are member variables of PhysicalOperator
+ //when instanitateFunc is invoked at deserialization time, both
+ //reporter and pigLogger are null. They are set during map and reduce calls,
+ //making the initializations here basically useless. Look at the processInput
+ //method where these variables are re-initialized. At that point, the PhysicalOperator
+ //is set up correctly with the reporter and pigLogger references
this.func.setReporter(reporter);
this.func.setPigLogger(pigLogger);
- }
-
- public Result processInput() throws ExecException {
+ }
+
+ public Result processInput() throws ExecException {
// Make sure the reporter is set, because it isn't getting carried
// across in the serialization (don't know why). I suspect it's as
// cheap to call the setReporter call everytime as to check whether I
// have (hopefully java will inline it).
if(!initialized) {
- func.setReporter(reporter);
- func.setPigLogger(pigLogger);
- initialized = true;
+ func.setReporter(reporter);
+ func.setPigLogger(pigLogger);
+ initialized = true;
}
- Result res = new Result();
- Tuple inpValue = null;
- if (input == null && (inputs == null || inputs.size()==0)) {
+ Result res = new Result();
+ Tuple inpValue = null;
+ if (input == null && (inputs == null || inputs.size()==0)) {
// log.warn("No inputs found. Signaling End of Processing.");
- res.returnStatus = POStatus.STATUS_EOP;
- return res;
- }
-
- //Should be removed once the model is clear
- if(reporter!=null) reporter.progress();
-
-
- if(isInputAttached()) {
- res.result = input;
- res.returnStatus = POStatus.STATUS_OK;
- detachInput();
- return res;
- } else {
- res.result = TupleFactory.getInstance().newTuple();
-
- Result temp = null;
- for(PhysicalOperator op : inputs) {
- switch(op.getResultType()){
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
+
+ //Should be removed once the model is clear
+ if(reporter!=null) reporter.progress();
+
+
+ if(isInputAttached()) {
+ res.result = input;
+ res.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return res;
+ } else {
+ res.result = TupleFactory.getInstance().newTuple();
+
+ Result temp = null;
+ for(PhysicalOperator op : inputs) {
+ switch(op.getResultType()){
case DataType.BAG:
temp = op.getNext(dummyBag);
break;
@@ -187,210 +188,222 @@
}
}
((Tuple)res.result).append(temp.result);
- }
- res.returnStatus = temp.returnStatus;
- return res;
- }
- }
+ }
+ res.returnStatus = temp.returnStatus;
+ return res;
+ }
+ }
- private Result getNext() throws ExecException {
- Result result = processInput();
+ private Result getNext() throws ExecException {
+ Result result = processInput();
String errMsg = "";
- try {
- if(result.returnStatus == POStatus.STATUS_OK) {
- result.result = func.exec((Tuple) result.result);
+ try {
+ if(result.returnStatus == POStatus.STATUS_OK) {
+ if (isAccumulative()) {
+ if (isAccumStarted()) {
+ ((Accumulator)func).accumulate((Tuple)result.result);
+ result.returnStatus = POStatus.STATUS_BATCH_OK;
+ result.result = null;
+ }else{
+ result.result = ((Accumulator)func).getValue();
+ ((Accumulator)func).cleanup();
+ }
+ } else {
+ result.result = func.exec((Tuple) result.result);
+ }
if(resultType == DataType.BYTEARRAY) {
if(res.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) {
result.result = new DataByteArray(result.result.toString().getBytes());
}
}
- return result;
- }
- return result;
- } catch (ExecException ee) {
- throw ee;
- } catch (IOException ioe) {
- int errCode = 2078;
- String msg = "Caught error from UDF: " + funcSpec.getClassName();
+ return result;
+ }
+
+ return result;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (IOException ioe) {
+ int errCode = 2078;
+ String msg = "Caught error from UDF: " + funcSpec.getClassName();
String footer = " [" + ioe.getMessage() + "]";
-
- if(ioe instanceof PigException) {
- int udfErrorCode = ((PigException)ioe).getErrorCode();
- if(udfErrorCode != 0) {
- errCode = udfErrorCode;
- msg = ((PigException)ioe).getMessage();
- } else {
- msg += " [" + ((PigException)ioe).getMessage() + " ]";
- }
- } else {
- msg += footer;
- }
-
- throw new ExecException(msg, errCode, PigException.BUG, ioe);
- } catch (IndexOutOfBoundsException ie) {
+
+ if(ioe instanceof PigException) {
+ int udfErrorCode = ((PigException)ioe).getErrorCode();
+ if(udfErrorCode != 0) {
+ errCode = udfErrorCode;
+ msg = ((PigException)ioe).getMessage();
+ } else {
+ msg += " [" + ((PigException)ioe).getMessage() + " ]";
+ }
+ } else {
+ msg += footer;
+ }
+
+ throw new ExecException(msg, errCode, PigException.BUG, ioe);
+ } catch (IndexOutOfBoundsException ie) {
int errCode = 2078;
String msg = "Caught error from UDF: " + funcSpec.getClassName() +
", Out of bounds access [" + ie.getMessage() + "]";
throw new ExecException(msg, errCode, PigException.BUG, ie);
- }
- }
+ }
+ }
- @Override
- public Result getNext(Tuple tIn) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(Tuple tIn) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(DataBag db) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(Integer i) throws ExecException {
- return getNext();
- }
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
- @Override
- public Result getNext(Boolean b) throws ExecException {
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(DataByteArray ba) throws ExecException {
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Double d) throws ExecException {
+ @Override
+ public Result getNext(Double d) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Float f) throws ExecException {
+ @Override
+ public Result getNext(Float f) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Long l) throws ExecException {
+ @Override
+ public Result getNext(Long l) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(Map m) throws ExecException {
+ @Override
+ public Result getNext(Map m) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- @Override
- public Result getNext(String s) throws ExecException {
+ @Override
+ public Result getNext(String s) throws ExecException {
- return getNext();
- }
+ return getNext();
+ }
- public void setAlgebraicFunction(byte Function) throws ExecException {
- // This will only be used by the optimizer for putting correct functions
- // in the mapper,
- // combiner and reduce. This helps in maintaining the physical plan as
- // is without the
- // optimiser having to replace any operators.
- // You wouldn't be able to make two calls to this function on the same
- // algebraic EvalFunc as
- // func is being changed.
- switch (Function) {
- case INITIAL:
+ public void setAlgebraicFunction(byte Function) throws ExecException {
+ // This will only be used by the optimizer for putting correct functions
+ // in the mapper,
+ // combiner and reduce. This helps in maintaining the physical plan as
+ // is without the
+ // optimiser having to replace any operators.
+ // You wouldn't be able to make two calls to this function on the same
+ // algebraic EvalFunc as
+ // func is being changed.
+ switch (Function) {
+ case INITIAL:
funcSpec = new FuncSpec(getInitial());
- break;
- case INTERMEDIATE:
+ break;
+ case INTERMEDIATE:
funcSpec = new FuncSpec(getIntermed());
- break;
- case FINAL:
+ break;
+ case FINAL:
funcSpec = new FuncSpec(getFinal());
- break;
+ break;
- }
+ }
instantiateFunc(funcSpec);
setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
- }
+ }
- public String getInitial() throws ExecException {
- instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getInitial();
- } else {
- int errCode = 2072;
- String msg = "Attempt to run a non-algebraic function"
+ public String getInitial() throws ExecException {
+ instantiateFunc(origFSpec);
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getInitial();
+ } else {
+ int errCode = 2072;
+ String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public String getIntermed() throws ExecException {
+ public String getIntermed() throws ExecException {
instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getIntermed();
- } else {
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getIntermed();
+ } else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public String getFinal() throws ExecException {
+ public String getFinal() throws ExecException {
instantiateFunc(origFSpec);
- if (func instanceof Algebraic) {
- return ((Algebraic) func).getFinal();
- } else {
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getFinal();
+ } else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
+ }
+ }
- public Type getReturnType() {
- return func.getReturnType();
- }
+ public Type getReturnType() {
+ return func.getReturnType();
+ }
- public void finish() {
- func.finish();
- }
+ public void finish() {
+ func.finish();
+ }
- public Schema outputSchema(Schema input) {
- return func.outputSchema(input);
- }
+ public Schema outputSchema(Schema input) {
+ return func.outputSchema(input);
+ }
- public Boolean isAsynchronous() {
- return func.isAsynchronous();
- }
+ public Boolean isAsynchronous() {
+ return func.isAsynchronous();
+ }
- @Override
- public String name() {
- return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
- }
+ @Override
+ public String name() {
+ return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+ }
- @Override
- public boolean supportsMultipleInputs() {
+ @Override
+ public boolean supportsMultipleInputs() {
- return true;
- }
+ return true;
+ }
- @Override
- public boolean supportsMultipleOutputs() {
+ @Override
+ public boolean supportsMultipleOutputs() {
- return false;
- }
+ return false;
+ }
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitUserFunc(this);
- }
+ v.visitUserFunc(this);
+ }
public FuncSpec getFuncSpec() {
return funcSpec;
@@ -414,4 +427,12 @@
is.defaultReadObject();
instantiateFunc(funcSpec);
}
+
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java Tue Nov 24 19:54:19 2009
@@ -53,6 +53,11 @@
@Override
public Result getNext(Double d) throws ExecException {
+ Result r = accumChild(null, d);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Double left = null, right = null;
@@ -76,6 +81,11 @@
@Override
public Result getNext(Float f) throws ExecException {
+ Result r = accumChild(null, f);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Float left = null, right = null;
@@ -99,6 +109,11 @@
@Override
public Result getNext(Integer i) throws ExecException {
+ Result r = accumChild(null, i);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Integer left = null, right = null;
@@ -122,6 +137,11 @@
@Override
public Result getNext(Long l) throws ExecException {
+ Result r = accumChild(null, l);
+ if (r != null) {
+ return r;
+ }
+
byte status;
Result res;
Long left = null, right = null;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java Tue Nov 24 19:54:19 2009
@@ -17,6 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -26,6 +29,7 @@
public abstract class UnaryExpressionOperator extends ExpressionOperator {
ExpressionOperator expr;
+ private transient List<ExpressionOperator> child;
public UnaryExpressionOperator(OperatorKey k, int rp) {
super(k, rp);
@@ -73,4 +77,17 @@
resultType = op.getResultType();
}
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ if (child == null) {
+ child = new ArrayList<ExpressionOperator>();
+ child.add(expr);
+ }
+
+ return child;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Tue Nov 24 19:54:19 2009
@@ -204,12 +204,6 @@
}
@Override
- public boolean equals(Object obj) {
- // TODO Auto-generated method stub
- return super.equals(obj);
- }
-
- @Override
public PhysicalPlan clone() throws CloneNotSupportedException {
PhysicalPlan clone = new PhysicalPlan();
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is used during Reduce phrase to process tuples
+ * in batch mode. It is used by POPackage when all of the UDFs can be
+ * called in accumulative mode. Tuples are not pulled all at once,
+ * instead, each time, only a specified number of tuples are pulled out
+ * of iterator and put in an buffer. Then this buffer is wrapped into
+ * a bag to be passed to the operators in reduce plan.
+ *
+ * The purpose of doing this is to reduce memory usage and avoid spilling.
+ */
+public interface AccumulativeTupleBuffer {
+
+ /**
+ * Pull next batch of tuples from iterator and put them into this buffer
+ */
+ public void nextBatch() throws IOException;
+
+ /**
+ * Whether there are more tuples to pull out of iterator
+ */
+ public boolean hasNextBatch() ;
+
+ /**
+ * Clear internal buffer, this should be called after all data are retreived
+ */
+ public void clear();
+
+ /**
+ * Get iterator of tuples in the buffer
+ * @param index the index of tuple
+ */
+ public Iterator<Tuple> getTuples(int index);
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Tue Nov 24 19:54:19 2009
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -61,14 +62,7 @@
private static Result eop = new Result(POStatus.STATUS_EOP, null);
transient private Log log = LogFactory.getLog(getClass());
-
- /*
- * The base index of this demux. In the case of
- * a demux contained in another demux, the index
- * passed in must be shifted before it can be used.
- */
- private int baseIndex = 0;
-
+
/*
* The list of sub-plans the inner plan is composed of
*/
@@ -178,7 +172,7 @@
@Override
public String name() {
- return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " + mKey.toString();
+ return "Demux" + isKeyWrapped + " - " + mKey.toString();
}
@Override
@@ -190,32 +184,35 @@
public boolean supportsMultipleOutputs() {
return false;
}
-
+
/**
- * Sets the base index of this demux.
- *
- * @param idx the base index
+ * Returns the list of inner plans.
+ *
+ * @return the list of the nested plans
*/
- public void setBaseIndex(int idx) {
- baseIndex = idx;
+ public List<PhysicalPlan> getPlans() {
+ return myPlans;
}
/**
- * Returns the base index of this demux
+ * Returns the list of booleans that indicates if the
+ * key needs to unwrapped for the corresponding plan.
*
- * @return the base index
+ * @return the list of isKeyWrapped boolean values
*/
- public int getBaseIndex() {
- return baseIndex;
+ public List<Boolean> getIsKeyWrappedList() {
+ return Collections.unmodifiableList(isKeyWrapped);
}
/**
- * Returns the list of inner plans.
- *
- * @return the list of the nested plans
+ * Adds a list of IsKeyWrapped boolean values
+ *
+ * @param lst the list of boolean values to add
*/
- public List<PhysicalPlan> getPlans() {
- return myPlans;
+ public void addIsKeyWrappedList(List<Boolean> lst) {
+ for (Boolean b : lst) {
+ isKeyWrapped.add(b);
+ }
}
/**
@@ -232,6 +229,12 @@
isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
keyPositions.add(keyPos);
}
+
+ public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {
+ myPlans.add(inPlan);
+ processedSet.set(myPlans.size()-1);
+ keyPositions.add(keyPos);
+ }
@Override
public Result getNext(Tuple t) throws ExecException {
@@ -357,8 +360,7 @@
// the POLocalRearrange operator and passed to this operator
// by POMultiQueryPackage
int index = fld.getIndex();
- index &= idxPart;
- index -= baseIndex;
+ index &= idxPart;
PhysicalPlan pl = myPlans.get(index);
if (!(pl.getRoots().get(0) instanceof PODemux)) {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Tue Nov 24 19:54:19 2009
@@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
@@ -39,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -49,19 +49,18 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-
/**
- * The operator models the join keys using the Local Rearrange operators which
- * are configured with the plan specified by the user. It also sets up
- * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
- * to a DataBag which holds all the values in the input having the same key(k)
- * The getNext() reads an input from its predecessor and separates them into
- * key & value. It configures a foreach operator with the databags obtained from
- * each Hashtable for the key and also with the value for the fragment input.
- * It then returns tuples returned by this foreach operator.
+ * The operator models the join keys using the Local Rearrange operators which
+ * are configured with the plan specified by the user. It also sets up one
+ * Hashtable per replicated input which maps the Key(k) stored as a Tuple to a
+ * DataBag which holds all the values in the input having the same key(k) The
+ * getNext() reads an input from its predecessor and separates them into key &
+ * value. It configures a foreach operator with the databags obtained from each
+ * Hashtable for the key and also with the value for the fragment input. It then
+ * returns tuples returned by this foreach operator.
*/
-//We intentionally skip type checking in backend for performance reasons
+// We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
public class POFRJoin extends PhysicalOperator {
/**
@@ -69,35 +68,48 @@
*/
private static final long serialVersionUID = 1L;
static private Log log = LogFactory.getLog(POFRJoin.class);
- //The number in the input list which denotes the fragmented input
+ // The number in the input list which denotes the fragmented input
private int fragment;
- //There can be n inputs each being a List<PhysicalPlan>
- //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+ // There can be n inputs each being a List<PhysicalPlan>
+ // Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
private List<List<PhysicalPlan>> phyPlanLists;
- //The key type for each Local Rearrange operator
+ // The key type for each Local Rearrange operator
private List<List<Byte>> keyTypes;
- //The Local Rearrange operators modeling the join key
+ // The Local Rearrange operators modeling the join key
private POLocalRearrange[] LRs;
- //The set of files that represent the replicated inputs
+ // The set of files that represent the replicated inputs
private FileSpec[] replFiles;
- //Used to configure the foreach operator
+ // Used to configure the foreach operator
private ConstantExpression[] constExps;
- //Used to produce the cross product of various bags
+ // Used to produce the cross product of various bags
private POForEach fe;
- //The array of Hashtables one per replicated input. replicates[fragment] = null
- private Map<Tuple,List<Tuple>> replicates[];
- //varaible which denotes whether we are returning tuples from the foreach operator
+ // The array of Hashtables one per replicated input. replicates[fragment] =
+ // null
+ // fragment is the input which is fragmented and not replicated.
+ private Map<Tuple, List<Tuple>> replicates[];
+ // varaible which denotes whether we are returning tuples from the foreach
+ // operator
private boolean processingPlan;
- //A dummy tuple
+ // A dummy tuple
private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
- //An instance of tuple factory
+ // An instance of tuple factory
private transient TupleFactory mTupleFactory;
private transient BagFactory mBagFactory;
private boolean setUp;
-
- public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] replFiles, int fragment) throws ExecException{
- super(k,rp,inp);
-
+ // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
+ // Join
+ private boolean isLeftOuterJoin;
+
+ // This list contains nullTuples according to schema of various inputs
+ private DataBag nullBag;
+
+ public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
+ List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
+ FileSpec[] replFiles, int fragment, boolean isLeftOuter,
+ Tuple nullTuple)
+ throws ExecException {
+ super(k, rp, inp);
+
phyPlanLists = ppLists;
this.fragment = fragment;
this.keyTypes = keyTypes;
@@ -109,32 +121,39 @@
processingPlan = false;
mTupleFactory = TupleFactory.getInstance();
mBagFactory = BagFactory.getInstance();
+ List<Tuple> tupList = new ArrayList<Tuple>();
+ tupList.add(nullTuple);
+ nullBag = mBagFactory.newDefaultBag(tupList);
+ this.isLeftOuterJoin = isLeftOuter;
}
-
- public List<List<PhysicalPlan>> getJoinPlans(){
+
+ public List<List<PhysicalPlan>> getJoinPlans() {
return phyPlanLists;
}
-
- private OperatorKey genKey(OperatorKey old){
- return new OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+
+ private OperatorKey genKey(OperatorKey old) {
+ return new OperatorKey(old.scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(old.scope));
}
-
+
/**
* Configures the Local Rearrange operators & the foreach operator
+ *
* @param old
- * @throws ExecException
+ * @throws ExecException
*/
- private void createJoinPlans(OperatorKey old) throws ExecException{
+ private void createJoinPlans(OperatorKey old) throws ExecException {
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flatList = new ArrayList<Boolean>();
-
- int i=-1;
+
+ int i = -1;
for (List<PhysicalPlan> ppLst : phyPlanLists) {
++i;
POLocalRearrange lr = new POLocalRearrange(genKey(old));
lr.setIndex(i);
lr.setResultType(DataType.TUPLE);
- lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE : keyTypes.get(i).get(0));
+ lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE
+ : keyTypes.get(i).get(0));
try {
lr.setPlans(ppLst);
} catch (PlanException pe) {
@@ -142,18 +161,22 @@
String msg = "Problem with setting up local rearrange's plans.";
throw new ExecException(msg, errCode, PigException.BUG, pe);
}
- LRs[i]= lr;
+ LRs[i] = lr;
ConstantExpression ce = new ConstantExpression(genKey(old));
- ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+ ce.setResultType((i == fragment) ? DataType.TUPLE : DataType.BAG);
constExps[i] = ce;
PhysicalPlan pp = new PhysicalPlan();
pp.add(ce);
fePlans.add(pp);
flatList.add(true);
}
- fe = new POForEach(genKey(old),-1,fePlans,flatList);
+ // The ForEach operator here is used for generating a Cross-Product
+ // It is given a set of constant expressions with
+ // Tuple,(Bag|Tuple),(...)
+ // It does a cross product on that and produces output.
+ fe = new POForEach(genKey(old), -1, fePlans, flatList);
}
-
+
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitFRJoin(this);
@@ -161,18 +184,17 @@
@Override
public String name() {
- return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+ return "FRJoin[" + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
- // TODO Auto-generated method stub
return true;
}
@Override
public boolean supportsMultipleOutputs() {
- // TODO Auto-generated method stub
return false;
}
@@ -180,34 +202,36 @@
public Result getNext(Tuple t) throws ExecException {
Result res = null;
Result inp = null;
- if(!setUp){
+ if (!setUp) {
setUpHashMap();
setUp = true;
}
- if(processingPlan){
- //Return tuples from the for each operator
- //Assumes that it is configured appropriately with
- //the bags for the current key.
- while(true) {
+ if (processingPlan) {
+ // Return tuples from the for each operator
+ // Assumes that it is configured appropriately with
+ // the bags for the current key.
+ while (true) {
res = fe.getNext(dummyTuple);
-
- if(res.returnStatus==POStatus.STATUS_OK){
+
+ if (res.returnStatus == POStatus.STATUS_OK) {
return res;
}
- if(res.returnStatus==POStatus.STATUS_EOP){
- processingPlan = false;
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ // We have completed all cross-products now its time to move
+ // to next tuple of left side
+ processingPlan = false;
break;
}
- if(res.returnStatus==POStatus.STATUS_ERR) {
+ if (res.returnStatus == POStatus.STATUS_ERR) {
return res;
}
- if(res.returnStatus==POStatus.STATUS_NULL) {
+ if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
}
}
}
while (true) {
- //Process the current input
+ // Process the current input
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR)
@@ -215,99 +239,128 @@
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
-
- //Separate Key & Value using the fragment's LR operator
+
+ // Separate Key & Value using the fragment's LR operator
POLocalRearrange lr = LRs[fragment];
- lr.attachInput((Tuple)inp.result);
+ lr.attachInput((Tuple) inp.result);
Result lrOut = lr.getNext(dummyTuple);
- if(lrOut.returnStatus!=POStatus.STATUS_OK) {
- log.error("LocalRearrange isn't configured right or is not working");
+ if (lrOut.returnStatus != POStatus.STATUS_OK) {
+ log
+ .error("LocalRearrange isn't configured right or is not working");
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
Tuple key = TupleFactory.getInstance().newTuple(1);
- key.set(0,lrOutTuple.get(1));
+ key.set(0, lrOutTuple.get(1));
Tuple value = getValueTuple(lr, lrOutTuple);
-
- //Configure the for each operator with the relevant bags
- int i=-1;
+
+ // Configure the for each operator with the relevant bags
+ int i = -1;
boolean noMatch = false;
for (ConstantExpression ce : constExps) {
++i;
- if(i==fragment){
+ if (i == fragment) {
+ // We set the first CE as the tuple from fragmented Left
ce.setValue(value);
continue;
}
Map<Tuple, List<Tuple>> replicate = replicates[i];
- if(!replicate.containsKey(key)){
+ if (!replicate.containsKey(key)) {
+ if (isLeftOuterJoin) {
+ ce.setValue(nullBag);
+ }
noMatch = true;
break;
}
ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
}
- if(noMatch)
+
+ // If this is not LeftOuter Join and there was no match we
+ // skip the processing of this left tuple and move ahead
+ if (!isLeftOuterJoin && noMatch)
continue;
fe.attachInput(dumTup);
processingPlan = true;
-
+
+ // We are all set, we call getNext (this function) which will call
+ // getNext on ForEach
+ // And that will return one tuple of Cross-Product between set
+ // constant Expressions
+ // All subsequent calls ( by parent ) to this function will return
+ // next tuple of crossproduct
Result gn = getNext(dummyTuple);
+
return gn;
}
}
/**
- * Builds the HashMaps by reading each replicated input from the DFS
- * using a Load operator
+ * Builds the HashMaps by reading each replicated input from the DFS using a
+ * Load operator
+ *
* @throws ExecException
*/
private void setUpHashMap() throws ExecException {
- int i=-1;
+ int i = -1;
long time1 = System.currentTimeMillis();
for (FileSpec replFile : replFiles) {
++i;
- if(i==fragment){
+
+ if (i == fragment) {
replicates[i] = null;
continue;
}
- POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
- PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+ POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
+ replFile, false);
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,
+ ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
pc.connect();
ld.setPc(pc);
+ // We use LocalRearrange Operator to seperate Key and Values
+ // eg. ( a, b, c ) would generate a, ( a, b, c )
+ // And we use 'a' as the key to the HashMap
+ // The rest '( a, b, c )' is added to HashMap as value
+ // We could have manually done this, but LocalRearrange does the
+ // same thing, so utilizing its functionality
POLocalRearrange lr = LRs[i];
- lr.setInputs(Arrays.asList((PhysicalOperator)ld));
- Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(1000);
+ lr.setInputs(Arrays.asList((PhysicalOperator) ld));
+ Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(
+ 1000);
log.debug("Completed setup. Trying to build replication hash table");
int cnt = 0;
- for(Result res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+ for (Result res = lr.getNext(dummyTuple);res.returnStatus != POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
++cnt;
- if(reporter!=null) reporter.progress();
+ if (reporter != null)
+ reporter.progress();
Tuple tuple = (Tuple) res.result;
Tuple key = mTupleFactory.newTuple(1);
- key.set(0,tuple.get(1));
+ key.set(0, tuple.get(1));
Tuple value = getValueTuple(lr, tuple);
- if(!replicate.containsKey(key))
+ if (!replicate.containsKey(key))
replicate.put(key, new ArrayList<Tuple>());
replicate.get(key).add(value);
}
replicates[i] = replicate;
}
- long time2 = System.currentTimeMillis();
- log.debug("Hash Table built. Time taken: " + (time2-time1));
+ long time2 = System.currentTimeMillis();
+ log.debug("Hash Table built. Time taken: " + (time2 - time1));
}
-
- private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+
+ private void readObject(ObjectInputStream is) throws IOException,
+ ClassNotFoundException, ExecException {
is.defaultReadObject();
mTupleFactory = TupleFactory.getInstance();
mBagFactory = BagFactory.getInstance();
-// setUpHashTable();
+ // setUpHashTable();
}
-
+
/*
* Extracts the value tuple from the LR operator's output tuple
*/
- private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws ExecException {
+ private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple)
+ throws ExecException {
Tuple val = (Tuple) tuple.get(2);
Tuple retTup = null;
boolean isProjectStar = lr.isProjectStar();
@@ -315,18 +368,18 @@
int keyLookupSize = keyLookup.size();
Object key = tuple.get(1);
boolean isKeyTuple = lr.isKeyTuple();
- Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
- if( keyLookupSize > 0) {
-
+ Tuple keyAsTuple = isKeyTuple ? (Tuple) tuple.get(1) : null;
+ if (keyLookupSize > 0) {
+
// we have some fields of the "value" in the
// "key".
retTup = mTupleFactory.newTuple();
int finalValueSize = keyLookupSize + val.size();
- int valIndex = 0; // an index for accessing elements from
- // the value (val) that we have currently
- for(int i = 0; i < finalValueSize; i++) {
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for (int i = 0; i < finalValueSize; i++) {
Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
+ if (keyIndex == null) {
// the field for this index is not in the
// key - so just take it from the "value"
// we were handed
@@ -334,7 +387,7 @@
valIndex++;
} else {
// the field for this index is in the key
- if(isKeyTuple) {
+ if (isKeyTuple) {
// the key is a tuple, extract the
// field out of the tuple
retTup.append(keyAsTuple.get(keyIndex));
@@ -343,19 +396,19 @@
}
}
}
-
+
} else if (isProjectStar) {
-
+
// the whole "value" is present in the "key"
retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
-
+
} else {
-
+
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
retTup = mTupleFactory.newTuple(val.getAll());
-
+
}
return retTup;
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue Nov 24 19:54:19 2009
@@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -37,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -59,7 +61,7 @@
protected List<PhysicalPlan> inputPlans;
protected List<PhysicalOperator> opsToBeReset;
transient protected Log log = LogFactory.getLog(getClass());
- protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
//Since the plan has a generate, this needs to be maintained
//as the generate can potentially return multiple tuples for
//same call.
@@ -89,6 +91,8 @@
protected PhysicalOperator[] planLeafOps = null;
+ protected transient AccumulativeTupleBuffer buffer;
+
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
@@ -146,15 +150,54 @@
@Override
public boolean supportsMultipleOutputs() {
return false;
+ }
+
+ public void setAccumulative() {
+ super.setAccumulative();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumulative();
+ }
+ }
+ }
+ }
+
+ public void setAccumStart() {
+ super.setAccumStart();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumStart();
+ }
+ }
+ }
}
+ public void setAccumEnd() {
+ super.setAccumEnd();
+ for(PhysicalPlan p : inputPlans) {
+ Iterator<PhysicalOperator> iter = p.iterator();
+ while(iter.hasNext()) {
+ PhysicalOperator po = iter.next();
+ if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+ po.setAccumEnd();
+ }
+ }
+ }
+ }
+
/**
* Calls getNext on the generate operator inside the nested
* physical plan and returns it maintaining an additional state
* to denote the begin and end of the nested plan processing.
*/
@Override
- public Result getNext(Tuple t) throws ExecException {
+ public Result getNext(Tuple t) throws ExecException {
Result res = null;
Result inp = null;
//The nested plan is under processing
@@ -162,14 +205,15 @@
//returns
if(processingPlan){
while(true) {
- res = processPlan();
+ res = processPlan();
+
if(res.returnStatus==POStatus.STATUS_OK) {
if(lineageTracer != null && res.result != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
}
return res;
}
@@ -198,32 +242,71 @@
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
-
+
attachInputToPlans((Tuple) inp.result);
+ Tuple tuple = (Tuple)inp.result;
+
for (PhysicalOperator po : opsToBeReset) {
po.reset();
}
- res = processPlan();
+
+ if (isAccumulative()) {
+ for(int i=0; i<tuple.size(); i++) {
+ if (tuple.getType(i) == DataType.BAG) {
+ // we only need to check one bag, because all the bags
+ // share the same buffer
+ buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+ break;
+ }
+ }
+
+ while(true) {
+ if (buffer.hasNextBatch()) {
+ try {
+ buffer.nextBatch();
+ }catch(IOException e) {
+ throw new ExecException(e);
+ }
+
+ setAccumStart();
+ }else{
+ buffer.clear();
+ setAccumEnd();
+ }
+
+ res = processPlan();
+
+ if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+ // attach same input again to process next batch
+ attachInputToPlans((Tuple) inp.result);
+ } else {
+ break;
+ }
+ }
+
+ } else {
+ res = processPlan();
+ }
processingPlan = true;
if(lineageTracer != null && res.result != null) {
- //we check for res.result since that can also be null in the case of flatten
- tIn = (ExampleTuple) inp.result;
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
+ //we check for res.result since that can also be null in the case of flatten
+ tIn = (ExampleTuple) inp.result;
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
}
return res;
}
}
- protected Result processPlan() throws ExecException{
+ protected Result processPlan() throws ExecException{
Result res = new Result();
-
+
//We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
if(its != null) {
boolean restartIts = true;
@@ -238,6 +321,7 @@
}
}
+
if(its == null) {
//getNext being called for the first time OR starting with a set of new data from inputs
its = new Iterator[noItems];
@@ -287,9 +371,13 @@
}
}
-
+
+ if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
+ continue;
+ }
+
if(inputData.returnStatus == POStatus.STATUS_EOP) {
- //we are done with all the elements. Time to return.
+ //we are done with all the elements. Time to return.
its = null;
bags = null;
return inputData;
@@ -310,6 +398,11 @@
}
}
+ // if accumulating, we haven't got data yet for some fields, just return
+ if (isAccumulative() && isAccumStarted()) {
+ res.returnStatus = POStatus.STATUS_BATCH_OK;
+ return res;
+ }
while(true) {
if(data == null) {
@@ -396,13 +489,13 @@
protected void attachInputToPlans(Tuple t) {
- //super.attachInput(t);
- for(PhysicalPlan p : inputPlans) {
+ //super.attachInput(t);
+ for(PhysicalPlan p : inputPlans) {
p.attachInput(t);
}
}
- protected void getLeaves() {
+ public void getLeaves() {
if (inputPlans != null) {
int i=-1;
if(isToBeFlattenedArray == null) {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Tue Nov 24 19:54:19 2009
@@ -55,18 +55,26 @@
*/
protected static final long serialVersionUID = 1L;
- protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
transient private Log log = LogFactory.getLog(getClass());
protected List<PhysicalPlan> plans;
+ protected List<PhysicalPlan> secondaryPlans;
+
protected List<ExpressionOperator> leafOps;
+
+ protected List<ExpressionOperator> secondaryLeafOps;
// The position of this LR in the package operator
protected byte index;
protected byte keyType;
+
+ protected byte mainKeyType;
+
+ protected byte secondaryKeyType;
protected boolean mIsDistinct = false;
@@ -83,6 +91,7 @@
// 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
// 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
private Map<Integer, Integer> mProjectedColsMap;
+ private Map<Integer, Integer> mSecondaryProjectedColsMap;
// A place holder Tuple used in distinct case where we really don't
// have any value to pass through. But hadoop gets cranky if we pass a
@@ -95,20 +104,25 @@
// is a project(*) - we set this ONLY when the project(*)
// is the ONLY thing in the cogroup by ..
private boolean mProjectStar = false;
+ private boolean mSecondaryProjectStar = false;
// marker to note that the "key" is a tuple
// this is required by POPackage to pick things
// off the "key" correctly to stitch together the
// "value"
private boolean isKeyTuple = false;
+ private boolean isSecondaryKeyTuple = false;
private int mProjectedColsMapSize = 0;
+ private int mSecondaryProjectedColsMapSize = 0;
private ArrayList<Integer> minValuePositions;
private int minValuePositionsSize = 0;
private Tuple lrOutput;
+ private boolean useSecondaryKey = false;
+
public POLocalRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -125,7 +139,9 @@
super(k, rp, inp);
index = -1;
leafOps = new ArrayList<ExpressionOperator>();
+ secondaryLeafOps = new ArrayList<ExpressionOperator>();
mProjectedColsMap = new HashMap<Integer, Integer>();
+ mSecondaryProjectedColsMap = new HashMap<Integer, Integer>();
lrOutput = mTupleFactory.newTuple(3);
}
@@ -246,7 +262,19 @@
for (PhysicalPlan ep : plans) {
ep.attachInput((Tuple)inp.result);
}
+
List<Result> resLst = new ArrayList<Result>();
+
+ if (secondaryPlans!=null) {
+ for (PhysicalPlan ep : secondaryPlans) {
+ ep.attachInput((Tuple)inp.result);
+ }
+ }
+
+ List<Result> secondaryResLst = null;
+ if (secondaryLeafOps!=null)
+ secondaryResLst = new ArrayList<Result>();
+
for (ExpressionOperator op : leafOps){
switch(op.getResultType()){
@@ -285,24 +313,66 @@
return new Result();
resLst.add(res);
}
- res.result = constructLROutput(resLst,(Tuple)inp.result);
+
+ if (secondaryLeafOps!=null)
+ {
+ for (ExpressionOperator op : secondaryLeafOps){
+
+ switch(op.getResultType()){
+ case DataType.BAG:
+ res = op.getNext(dummyBag);
+ break;
+ case DataType.BOOLEAN:
+ res = op.getNext(dummyBool);
+ break;
+ case DataType.BYTEARRAY:
+ res = op.getNext(dummyDBA);
+ break;
+ case DataType.CHARARRAY:
+ res = op.getNext(dummyString);
+ break;
+ case DataType.DOUBLE:
+ res = op.getNext(dummyDouble);
+ break;
+ case DataType.FLOAT:
+ res = op.getNext(dummyFloat);
+ break;
+ case DataType.INTEGER:
+ res = op.getNext(dummyInt);
+ break;
+ case DataType.LONG:
+ res = op.getNext(dummyLong);
+ break;
+ case DataType.MAP:
+ res = op.getNext(dummyMap);
+ break;
+ case DataType.TUPLE:
+ res = op.getNext(dummyTuple);
+ break;
+ }
+ if(res.returnStatus!=POStatus.STATUS_OK)
+ return new Result();
+ secondaryResLst.add(res);
+ }
+ }
+ // If we are using secondary sort key, our new key is:
+ // (nullable, index, (key, secondary key), value)
+ res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
return res;
}
return inp;
}
- protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
- //Construct key
+ protected Object getKeyFromResult(List<Result> resLst, byte type) throws ExecException {
Object key;
-
if(resLst.size()>1){
Tuple t = mTupleFactory.newTuple(resLst.size());
int i=-1;
for(Result res : resLst)
t.set(++i, res.result);
key = t;
- } else if (resLst.size() == 1 && keyType == DataType.TUPLE) {
+ } else if (resLst.size() == 1 && type == DataType.TUPLE) {
// We get here after merging multiple jobs that have different
// map key types into a single job during multi-query optimization.
@@ -319,6 +389,21 @@
else{
key = resLst.get(0).result;
}
+ return key;
+ }
+
+ protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
+ //Construct key
+ Object key;
+ Object secondaryKey=null;
+
+
+ if (secondaryResLst!=null && secondaryResLst.size()>0)
+ {
+ key = getKeyFromResult(resLst, mainKeyType);
+ secondaryKey = getKeyFromResult(secondaryResLst, secondaryKeyType);
+ } else
+ key = getKeyFromResult(resLst, keyType);
if (mIsDistinct) {
@@ -340,7 +425,15 @@
//Put the index, key, and value
//in a tuple and return
- lrOutput.set(1, key);
+ if (useSecondaryKey)
+ {
+ Tuple compoundKey = mTupleFactory.newTuple(2);
+ compoundKey.set(0, key);
+ compoundKey.set(1, secondaryKey);
+ lrOutput.set(1, compoundKey);
+ }
+ else
+ lrOutput.set(1, key);
// strip off the columns in the "value" which
// are present in the "key"
@@ -398,12 +491,20 @@
}
public void setKeyType(byte keyType) {
- this.keyType = keyType;
+ if (useSecondaryKey)
+ this.mainKeyType = keyType;
+ else
+ this.keyType = keyType;
}
public List<PhysicalPlan> getPlans() {
return plans;
}
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ mainKeyType = keyType;
+ }
public void setPlans(List<PhysicalPlan> plans) throws PlanException {
this.plans = plans;
@@ -464,6 +565,74 @@
}
mProjectedColsMapSize = mProjectedColsMap.size();
}
+
+ public void setSecondaryPlans(List<PhysicalPlan> plans) throws PlanException {
+ this.secondaryPlans = plans;
+ secondaryLeafOps.clear();
+ int keyIndex = 0; // zero based index for fields in the key
+ for (PhysicalPlan plan : plans) {
+ ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
+ secondaryLeafOps.add(leaf);
+
+ // don't optimize CROSS
+ if(!isCross) {
+ // Look for the leaf Ops which are POProject operators - get the
+ // the columns that these POProject Operators are projecting.
+ // They MUST be projecting either a column or '*'.
+ // Keep track of the columns which are being projected and
+ // the position in the "Key" where these will be projected to.
+ // Then we can use this information to strip off these columns
+ // from the "Value" and in POPackage stitch the right "Value"
+ // tuple back by getting these columns from the "key". The goal
+ // is reduce the amount of the data sent to Hadoop in the map.
+ if(leaf instanceof POProject) {
+ POProject project = (POProject) leaf;
+ if(project.isStar()) {
+ if(secondaryPlans.size() == 1) {
+ // note that we have a project *
+ mSecondaryProjectStar = true;
+ // key will be a tuple in this case
+ isSecondaryKeyTuple = true;
+ } else {
+ // TODO: currently "group by (*, somethingelse)" is NOT
+ // allowed. So we should never get here. But once it is
+ // allowed, we will need to handle it. For now just log
+ log.debug("Project * in group by not being optimized in key-value transfer");
+ }
+ } else {
+ try {
+ List<PhysicalOperator> preds = plan.getPredecessors(leaf);
+ if (preds==null || !(preds.get(0) instanceof POProject))
+ mSecondaryProjectedColsMap.put(project.getColumn(), keyIndex);
+ } catch (ExecException e) {
+ int errCode = 2070;
+ String msg = "Problem in accessing column from project operator.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+ }
+ if(project.getResultType() == DataType.TUPLE)
+ isSecondaryKeyTuple = true;
+ }
+ keyIndex++;
+ }
+ }
+ if(keyIndex > 1) {
+ // make a note that the "key" is a tuple
+ // this is required by POPackage to pick things
+ // off the "key" correctly to stitch together the
+ // "value"
+ isSecondaryKeyTuple = true;
+ }
+ mainKeyType = keyType;
+ keyType = DataType.TUPLE;
+ if (plans.size()>1)
+ secondaryKeyType = DataType.TUPLE;
+ else
+ {
+ secondaryKeyType = plans.get(0).getLeaves().get(0).getResultType();
+ }
+ mSecondaryProjectedColsMapSize = mSecondaryProjectedColsMap.size();
+ }
/**
* Make a deep copy of this operator.
@@ -488,6 +657,9 @@
throw cnse;
}
clone.keyType = keyType;
+ clone.mainKeyType = mainKeyType;
+ clone.secondaryKeyType = secondaryKeyType;
+ clone.useSecondaryKey = useSecondaryKey;
clone.index = index;
try {
clone.lrOutput.set(0, index);
@@ -516,6 +688,13 @@
public Map<Integer, Integer> getProjectedColsMap() {
return mProjectedColsMap;
}
+
+ /**
+ * @return the mProjectedColsMap
+ */
+ public Map<Integer, Integer> getSecondaryProjectedColsMap() {
+ return mSecondaryProjectedColsMap;
+ }
/**
* @return the mProjectStar
@@ -525,6 +704,13 @@
}
/**
+ * @return the mProjectStar
+ */
+ public boolean isSecondaryProjectStar() {
+ return mSecondaryProjectStar;
+ }
+
+ /**
* @return the keyTuple
*/
public boolean isKeyTuple() {
@@ -532,6 +718,13 @@
}
/**
+ * @return the keyTuple
+ */
+ public boolean isSecondaryKeyTuple() {
+ return isSecondaryKeyTuple;
+ }
+
+ /**
* @param plans
* @throws ExecException
*/
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Tue Nov 24 19:54:19 2009
@@ -76,7 +76,7 @@
mIsDistinct + ") - " + mKey.toString();
}
- protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+ protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
//Construct key
Object key;
if(resLst.size()>1){
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Tue Nov 24 19:54:19 2009
@@ -64,8 +64,6 @@
private List<POPackage> packages = new ArrayList<POPackage>();
transient private PigNullableWritable myKey;
-
- private int baseIndex = 0;
/**
* Constructs an operator with the specified key.
@@ -111,7 +109,7 @@
@Override
public String name() {
- return "MultiQuery Package[" + baseIndex +"] - " + getOperatorKey().toString();
+ return "MultiQuery Package - " + getOperatorKey().toString();
}
@Override
@@ -174,7 +172,6 @@
int index = (int)origIndex;
index &= idxPart;
- index -= baseIndex;
if (index >= packages.size() || index < 0) {
int errCode = 2140;
@@ -221,21 +218,4 @@
return res;
}
- /**
- * Sets the base index of this operator
- *
- * @param baseIndex the base index of this operator
- */
- public void setBaseIndex(int baseIndex) {
- this.baseIndex = baseIndex;
- }
-
- /**
- * Returns the base index of this operator
- *
- * @return the base index of this operator
- */
- public int getBaseIndex() {
- return baseIndex;
- }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Nov 24 19:54:19 2009
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -25,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.DataBag;
@@ -89,6 +92,9 @@
//key, no value.
int numInputs;
+ // If the attaching map-reduce plan use secondary sort key
+ boolean useSecondaryKey = false;
+
//Denotes if inner is specified
//on a particular input
boolean[] inner;
@@ -107,8 +113,8 @@
transient private final Log log = LogFactory.getLog(getClass());
- protected static BagFactory mBagFactory = BagFactory.getInstance();
- protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ protected static final BagFactory mBagFactory = BagFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
public POPackage(OperatorKey k) {
this(k, -1, null);
@@ -157,6 +163,15 @@
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
tupIter = inp;
key = k.getValueAsPigType();
+ if (useSecondaryKey)
+ {
+ try {
+ key = ((Tuple)key).get(0);
+ } catch (ExecException e) {
+ // TODO Exception
+ throw new RuntimeException(e);
+ }
+ }
if(isKeyTuple) {
// key is a tuple, cache the key as a
// tuple for use in the getNext()
@@ -187,7 +202,7 @@
public void setInner(boolean[] inner) {
this.inner = inner;
}
-
+
/**
* From the inputs, constructs the output tuple
* for this co-group in the required format which
@@ -206,38 +221,51 @@
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
- String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
- }
-
- for (int i = 0; i < numInputs; i++) {
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- dbs[i] = mBagFactory.newDefaultBag();
- } else {
- dbs[i] = new InternalCachedBag(numInputs);
- }
- }
-
- //For each indexed tup in the inp, sort them
- //into their corresponding bags based
- //on the index
- while (tupIter.hasNext()) {
- NullableTuple ntup = tupIter.next();
- int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
+ if (isAccumulative()) {
+ // create bag wrapper to pull tuples in many batches
+ // all bags have reference to the sample tuples buffer
+ // which contains tuples from one batch
+ POPackageTupleBuffer buffer = new POPackageTupleBuffer();
+ for (int i = 0; i < numInputs; i++) {
+ dbs[i] = new AccumulativeBag(buffer, i);
+ }
- if (numInputs == 1) {
+ } else {
+ // create bag to pull all tuples out of iterator
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ }
+
+
+ for (int i = 0; i < numInputs; i++) {
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ dbs[i] = mBagFactory.newDefaultBag();
+ } else {
+ dbs[i] = new InternalCachedBag(numInputs);
+ }
+ }
+
+ //For each indexed tup in the inp, sort them
+ //into their corresponding bags based
+ //on the index
+ while (tupIter.hasNext()) {
+ NullableTuple ntup = tupIter.next();
+ int index = ntup.getIndex();
+ Tuple copy = getValueTuple(ntup, index);
- // this is for multi-query merge where
- // the numInputs is always 1, but the index
- // (the position of the inner plan in the
- // enclosed operator) may not be 1.
- dbs[0].add(copy);
- } else {
- dbs[index].add(copy);
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ dbs[0].add(copy);
+ } else {
+ dbs[index].add(copy);
+ }
+ if(reporter!=null) reporter.progress();
}
- if(reporter!=null) reporter.progress();
}
//Construct the output tuple by appending
@@ -247,14 +275,16 @@
res.set(0,key);
int i=-1;
for (DataBag bag : dbs) {
- if(inner[++i]){
+ i++;
+ if(inner[i] && !isAccumulative()){
if(bag.size()==0){
detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_NULL;
return r;
}
- }
+ }
+
res.set(i+1,bag);
}
}
@@ -403,5 +433,75 @@
public void setDistinct(boolean distinct) {
this.distinct = distinct;
}
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+ private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+ private List<Tuple>[] bags;
+ private Iterator<NullableTuple> iter;
+ private int batchSize;
+ private Object currKey;
+
+ @SuppressWarnings("unchecked")
+ public POPackageTupleBuffer() {
+ batchSize = 20000;
+ if (PigMapReduce.sJobConf != null) {
+ String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+ if (size != null) {
+ batchSize = Integer.parseInt(size);
+ }
+ }
+
+ this.bags = new List[numInputs];
+ for(int i=0; i<numInputs; i++) {
+ this.bags[i] = new ArrayList<Tuple>();
+ }
+ this.iter = tupIter;
+ this.currKey = key;
+ }
+
+ @Override
+ public boolean hasNextBatch() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public void nextBatch() throws IOException {
+ for(int i=0; i<bags.length; i++) {
+ bags[i].clear();
+ }
+
+ key = currKey;
+ for(int i=0; i<batchSize; i++) {
+ if (iter.hasNext()) {
+ NullableTuple ntup = iter.next();
+ int index = ntup.getIndex();
+ Tuple copy = getValueTuple(ntup, index);
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ bags[0].add(copy);
+ } else {
+ bags[index].add(copy);
+ }
+ }
+ }
+ }
+
+ public void clear() {
+ for(int i=0; i<bags.length; i++) {
+ bags[i].clear();
+ }
+ iter = null;
+ }
+
+ public Iterator<Tuple> getTuples(int index) {
+ return bags[index].iterator();
+ }
+ };
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Tue Nov 24 19:54:19 2009
@@ -23,6 +23,10 @@
import java.util.Iterator;
import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -96,6 +100,10 @@
public POPackageLite clone() throws CloneNotSupportedException {
POPackageLite clone = (POPackageLite)super.clone();
clone.inner = null;
+ clone.keyInfo = new HashMap<Integer, Pair<Boolean,Map<Integer,Integer>>>();
+ for (Entry<Integer, Pair<Boolean, Map<Integer,Integer>>> entry: keyInfo.entrySet()) {
+ clone.keyInfo.put(entry.getKey(), entry.getValue());
+ }
return clone;
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Tue Nov 24 19:54:19 2009
@@ -207,7 +207,7 @@
// Returns bag of tuples
protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
- Tuple t = super.constructLROutput(resLst, value);
+ Tuple t = super.constructLROutput(resLst, null, value);
//Construct key
Object key = t.get(1);