You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/06 12:54:14 UTC
svn commit: r1585283 [2/3] - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/fetch/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/ap...
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Sun Apr 6 10:54:13 2014
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.Illustrator;
+
+public class JoinPackager extends Packager {
+
+ private POOptimizedForEach forEach;
+ private boolean newKey = true;
+ private Tuple res = null;
+ private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+
+ public static final String DEFAULT_CHUNK_SIZE = "1000";
+
+ private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
+ private Result forEachResult;
+ private DataBag[] dbs = null;
+
+ private int lastBagIndex;
+
+ private Iterator<Tuple> lastBagIter;
+
+ public JoinPackager(Packager p, POForEach f) {
+ super();
+ String scope = f.getOperatorKey().getScope();
+ NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+ forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ if (p!=null)
+ {
+ setKeyType(p.getKeyType());
+ setNumInputs(p.getNumInputs());
+ lastBagIndex = numInputs - 1;
+ setInner(p.getInner());
+ setKeyInfo(p.getKeyInfo());
+ this.isKeyTuple = p.isKeyTuple;
+ this.isKeyCompound = p.isKeyCompound;
+ }
+ if (f!=null)
+ {
+ setInputPlans(f.getInputPlans());
+ setToBeFlattened(f.getToBeFlattened());
+ }
+ }
+
+ /**
+ * Calls getNext to get next ForEach result. The input for POJoinPackage is
+ * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
+ * one tuple a time to the delegated ForEach operator, the input for ForEach is
+ *
+ * (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
+ *
+ * of k tuples.
+ * For every ForEach input, pull all the results from ForEach.
+ * getNext will be called multiple times for a particular input,
+ * it returns one output tuple from ForEach every time we call getNext,
+ * so we need to maintain internal status to keep tracking of where we are.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+ Tuple it = null;
+
+ // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
+ // tuple res = (key, input#1, input#2....input#n), the only missing value is input#n,
+ // we will get input#n one tuple a time, fill in res, feed to ForEach.
+ // After this block, we have the first tuple of input#n in hand (kept in variable it)
+ if (newKey)
+ {
+ // Put n-1 inputs into bags
+ dbs = new DataBag[numInputs];
+ for (int i = 0; i < numInputs - 1; i++) {
+ dbs[i] = bags[i];
+ }
+
+ // For last bag, we always use NonSpillableBag.
+ dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize);
+
+ lastBagIter = bags[lastBagIndex].iterator();
+
+ // If we don't have any tuple for input#n
+ // we do not need any further process, return EOP
+ if (!lastBagIter.hasNext()) {
+ // we will return at this point because we ought
+ // to be having a flatten on this last input
+ // and we have an empty bag which should result
+ // in this key being taken out of the output
+ newKey = true;
+ return eopResult;
+ }
+
+ res = mTupleFactory.newTuple(numInputs+1);
+ for (int i = 0; i < dbs.length; i++)
+ res.set(i+1,dbs[i]);
+
+ res.set(0,key);
+ // if we have an inner anywhere and the corresponding
+ // bag is empty, we can just return
+ for (int i = 0; i < dbs.length - 1; i++) {
+ if(inner[i]&&dbs[i].size()==0){
+ detachInput();
+ return eopResult;
+ }
+ }
+ newKey = false;
+ }
+
+ // Keep attaching input tuple to ForEach, until:
+ // 1. We can initialize ForEach.getNext();
+ // 2. There is no more input#n
+ while (lastBagIter.hasNext() || forEach.processingPlan) {
+ // if a previous call to foreach.getNext()
+ // has still not returned all output, process it
+ while (forEach.processingPlan) {
+ forEachResult = forEach.getNextTuple();
+ switch (forEachResult.returnStatus) {
+ case POStatus.STATUS_OK:
+ case POStatus.STATUS_ERR:
+ return forEachResult;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ break;
+ }
+ }
+
+ if (lastBagIter.hasNext()) {
+ // try setting up a bag of CHUNKSIZE OR
+ // the remainder of the bag of last input
+ // (if < CHUNKSIZE) to foreach
+ dbs[lastBagIndex].clear(); // clear last chunk
+ for (int i = 0; i < chunkSize && lastBagIter.hasNext(); i++) {
+ it = lastBagIter.next();
+ dbs[lastBagIndex].add(it);
+ }
+ } else {
+ detachInput();
+ return eopResult;
+ }
+
+ // Attach the input to forEach
+ forEach.attachInput(res);
+
+ // pull output tuple from ForEach
+ Result forEachResult = forEach.getNextTuple();
+ {
+ switch (forEachResult.returnStatus) {
+ case POStatus.STATUS_OK:
+ case POStatus.STATUS_ERR:
+ return forEachResult;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ break;
+ }
+ }
+ }
+ detachInput();
+ return eopResult;
+ }
+
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ checkBagType();
+
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // JoinPackager expects all but the last bag to be materialized
+ for (int i = 0; i < bags.length - 1; i++) {
+ if (readOnce[i]) {
+ DataBag materializedBag = getBag();
+ materializedBag.addAll(bags[i]);
+ bags[i] = materializedBag;
+ }
+ }
+ if (readOnce[numInputs - 1] != true) {
+ throw new ExecException(
+ "JoinPackager expects the last input to be streamed");
+ }
+ this.newKey = true;
+ }
+
+ public List<PhysicalPlan> getInputPlans() {
+ return forEach.getInputPlans();
+ }
+
+ public void setInputPlans(List<PhysicalPlan> plans) {
+ forEach.setInputPlans(plans);
+ }
+
+ public void setToBeFlattened(List<Boolean> flattens) {
+ forEach.setToBeFlattened(flattens);
+ }
+
+ /**
+ * @return the forEach
+ */
+ public POOptimizedForEach getForEach() {
+ return forEach;
+ }
+
+ /**
+ * @param chunkSize - the chunk size for the biggest input
+ */
+ public void setChunkSize(long chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ forEach.setIllustrator(illustrator);
+ }
+
+ @Override
+ public String name() {
+ return this.getClass().getSimpleName() + "(" + forEach.getFlatStr() + ")";
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Sun Apr 6 10:54:13 2014
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802
+ * for more details.
+ */
+public class LitePackager extends Packager {
+
+ private PigNullableWritable keyWritable;
+
+ @Override
+ public boolean[] getInner() {
+ return null;
+ }
+
+ @Override
+ public void setInner(boolean[] inner) {
+ }
+
+ /**
+ * Make a deep copy of this operator.
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ public LitePackager clone() throws CloneNotSupportedException {
+ LitePackager clone = (LitePackager) super.clone();
+ clone.inner = null;
+ if (keyInfo != null) {
+ clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(keyInfo);
+ }
+ return clone;
+ }
+
+ /**
+ * @return the distinct
+ */
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ /**
+ * @param distinct
+ * the distinct to set
+ */
+ @Override
+ public void setDistinct(boolean distinct) {
+ }
+
+ /**
+ * Similar to POPackage.getNext except that
+ * only one input is expected with index 0
+ * and ReadOnceBag is used instead of
+ * DefaultDataBag.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ Tuple res;
+
+ //Construct the output tuple by appending
+ //the key and all the above constructed bags
+ //and return it.
+ res = mTupleFactory.newTuple(numInputs+1);
+ res.set(0,key);
+ res.set(1, bags[0]);
+ detachInput();
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_OK;
+ r.result = illustratorMarkup(null, res, 0);
+ return r;
+ }
+
+ /**
+ * Makes use of the superclass method, but this requires an additional
+ * parameter key passed by ReadOnceBag. key of this instance will be set to
+ * null in detachInput call, but an instance of ReadOnceBag may have the
+ * original key that it uses. Therefore this extra argument is taken to
+ * temporarily set it before the call to the superclass method and then
+ * restore it.
+ */
+ @Override
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ PigNullableWritable origKey = this.keyWritable;
+ this.keyWritable = keyWritable;
+ Tuple retTuple = super.getValueTuple(keyWritable, ntup, index);
+ this.keyWritable = origKey;
+ return retTuple;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+ }
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = false; // not expect this to be really used
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Sun Apr 6 10:54:13 2014
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * The package operator that packages the globally rearranged tuples
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator
+ * receives an input, it extracts the index from the key and calls the
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which
+ * knows how to consume the output of this operator.
+ */
+public class MultiQueryPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private static int idxPart = 0x7F;
+
+ private List<Packager> packagers = new ArrayList<Packager>();
+
+ /**
+ * If the POLocalRearranges corresponding to the reduce plans in
+ * myPlans (the list of inner plans of the demux) have different key types
+ * then the MultiQueryOptimizer converts all the keys to be of type tuple
+ * by wrapping any non-tuple keys into Tuples (keys which are already tuples
+ * are left alone).
+ * The list below is a list of booleans indicating whether extra tuple wrapping
+ * was done for the key in the corresponding POLocalRearranges and if we need
+ * to "unwrap" the tuple to get to the key
+ */
+ private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+ /*
+ * Indicating if all the inner plans have the same
+ * map key type. If not, the keys passed in are
+ * wrapped inside tuples and need to be extracted
+ * out during the reduce phase
+ */
+ private boolean sameMapKeyType = true;
+
+ /*
+ * Indicating if this operator is in a combiner.
+ * If not, this operator is in a reducer and the key
+ * values must first be extracted from the tuple-wrap
+ * before writing out to the disk
+ */
+ private boolean inCombiner = false;
+
+ private PigNullableWritable keyWritable = null;
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ */
+ public void addPackager(Packager pkgr) {
+ packagers.add(pkgr);
+ }
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ * @param mapKeyType the map key type associated with the package
+ */
+ public void addPackager(Packager pkgr, byte mapKeyType) {
+ packagers.add(pkgr);
+ // if mapKeyType is already a tuple, we will NOT
+ // be wrapping it in an extra tuple. If it is not
+ // a tuple, we will wrap into in a tuple
+ isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+ }
+
+ /**
+ * Returns the list of packages.
+ *
+ * @return the list of the packages
+ */
+ public List<Packager> getPackagers() {
+ return packagers;
+ }
+
+ /**
+ * Constructs the output tuple from the inputs.
+ * <p>
+ * The output is consumed by for the demultiplexer operator
+ * (PODemux) in the format (key, {bag of tuples}) where key
+ * is an indexed WritableComparable, not the wrapped value as a pig type.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ byte origIndex = keyWritable.getIndex();
+
+ int index = (int)origIndex;
+ index &= idxPart;
+
+ if (index >= packagers.size() || index < 0) {
+ int errCode = 2140;
+ String msg = "Invalid package index " + index
+ + " should be in the range between 0 and "
+ + packagers.size();
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ Packager pkgr = packagers.get(index);
+
+ // check to see if we need to unwrap the key. The keys may be
+ // wrapped inside a tuple by LocalRearrange operator when jobs
+ // with different map key types are merged
+ PigNullableWritable curKey = keyWritable;
+ if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+ Tuple tup = (Tuple) keyWritable.getValueAsPigType();
+ curKey = HDataType.getWritableComparableTypes(tup.get(0),
+ pkgr.getKeyType());
+ curKey.setIndex(origIndex);
+ }
+
+ pkgr.attachInput(curKey, bags, readOnce);
+
+ Result res = pkgr.getNext();
+ pkgr.detachInput();
+ detachInput();
+
+ Tuple tuple = (Tuple)res.result;
+
+ // the object present in the first field
+ // of the tuple above is the real data without
+ // index information - this is because the
+ // package above, extracts the real data out of
+ // the PigNullableWritable object - we are going to
+ // give this result tuple to a PODemux operator
+ // which needs a PigNullableWritable first field so
+ // it can figure out the index. Therefore we need
+ // to add index to the first field of the tuple.
+
+ Object obj = tuple.get(0);
+ if (obj instanceof PigNullableWritable) {
+ ((PigNullableWritable)obj).setIndex(origIndex);
+ }
+ else {
+ PigNullableWritable myObj = null;
+ if (obj == null) {
+ myObj = new NullableUnknownWritable();
+ myObj.setNull(true);
+ }
+ else {
+ myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey));
+ }
+ myObj.setIndex(origIndex);
+ tuple.set(0, myObj);
+ }
+ // illustrator markup has been handled by "pkgr"
+ return res;
+ }
+
+ /**
+ * Returns the list of booleans that indicates if the
+ * key needs to unwrapped for the corresponding plan.
+ *
+ * @return the list of isKeyWrapped boolean values
+ */
+ public List<Boolean> getIsKeyWrappedList() {
+ return Collections.unmodifiableList(isKeyWrapped);
+ }
+
+ /**
+ * Adds a list of IsKeyWrapped boolean values
+ *
+ * @param lst the list of boolean values to add
+ */
+ public void addIsKeyWrappedList(List<Boolean> lst) {
+ for (Boolean b : lst) {
+ isKeyWrapped.add(b);
+ }
+ }
+
+ public void setInCombiner(boolean inCombiner) {
+ this.inCombiner = inCombiner;
+ }
+
+ public boolean isInCombiner() {
+ return inCombiner;
+ }
+
+ public void setSameMapKeyType(boolean sameMapKeyType) {
+ this.sameMapKeyType = sameMapKeyType;
+ }
+
+ public boolean isSameMapKeyType() {
+ return sameMapKeyType;
+ }
+
+ @Override
+ public int getNumInputs(byte index) {
+ return packagers.get(((int) index) & idxPart).getNumInputs(index);
+ }
+
+ @Override
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ this.keyWritable = keyWritable;
+ return packagers.get(((int) index) & idxPart).getValueTuple(
+ keyWritable, ntup, index);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sun Apr 6 10:54:13 2014
@@ -21,32 +21,29 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
/**
* The package operator that packages
@@ -67,50 +64,18 @@ public class POPackage extends PhysicalO
*/
private static final long serialVersionUID = 1L;
-
- public static enum PackageType { GROUP, JOIN };
-
//The iterator of indexed Tuples
//that is typically provided by
//Hadoop
transient Iterator<NullableTuple> tupIter;
//The key being worked on
- Object key;
-
- // marker to indicate if key is a tuple
- protected boolean isKeyTuple = false;
- // marker to indicate if the tuple key is compound in nature
- protected boolean isKeyCompound = false;
- // key as a Tuple object (if the key is a tuple)
- protected Tuple keyAsTuple;
-
- //key's type
- byte keyType;
+ protected Object key;
//The number of inputs to this
//co-group. 0 indicates a distinct, which means there will only be a
//key, no value.
- int numInputs;
-
- // If the attaching map-reduce plan use secondary sort key
- boolean useSecondaryKey = false;
-
- //Denotes if inner is specified
- //on a particular input
- boolean[] inner;
-
- // flag to denote whether there is a distinct
- // leading to this package
- protected boolean distinct = false;
-
- // A mapping of input index to key information got from LORearrange
- // for that index. The Key information is a pair of boolean, Map.
- // The boolean indicates whether there is a lone project(*) in the
- // cogroup by. If not, the Map has a mapping of column numbers in the
- // "value" to column numbers in the "key" which contain the fields in
- // the "value"
- protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+ protected int numInputs;
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -119,7 +84,9 @@ public class POPackage extends PhysicalO
private boolean useDefaultBag = false;
- private PackageType pkgType;
+ protected Packager pkgr;
+
+ private PigNullableWritable keyWritable;
public POPackage(OperatorKey k) {
this(k, -1, null);
@@ -134,16 +101,27 @@ public class POPackage extends PhysicalO
}
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ this(k, rp, inp, new Packager());
+ }
+
+ public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
+ Packager pkgr) {
super(k, rp, inp);
numInputs = -1;
- keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+ this.pkgr = pkgr;
+ }
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ super.setIllustrator(illustrator);
+ pkgr.setIllustrator(illustrator);
}
@Override
public String name() {
- return getAliasString() + "Package" + "["
+ return getAliasString() + "Package" + "(" + pkgr.name() + ")" + "["
+ DataType.findTypeName(resultType) + "]" + "{"
- + DataType.findTypeName(keyType) + "}" + " - "
+ + DataType.findTypeName(pkgr.getKeyType()) + "}" + " - "
+ mKey.toString();
}
@@ -171,16 +149,9 @@ public class POPackage extends PhysicalO
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
try {
tupIter = inp;
- key = k.getValueAsPigType();
- if (useSecondaryKey) {
- key = ((Tuple)key).get(0);
-
- }
- if(isKeyTuple) {
- // key is a tuple, cache the key as a
- // tuple for use in the getNext()
- keyAsTuple = (Tuple)key;
- }
+ key = pkgr.getKey(k);
+ keyWritable = k;
+ inputAttached = true;
} catch (Exception e) {
throw new RuntimeException(
"Error attaching input for key " + k +
@@ -191,9 +162,11 @@ public class POPackage extends PhysicalO
/**
* attachInput's better half!
*/
+ @Override
public void detachInput() {
tupIter = null;
key = null;
+ inputAttached = false;
}
public int getNumInps() {
@@ -202,46 +175,38 @@ public class POPackage extends PhysicalO
public void setNumInps(int numInps) {
this.numInputs = numInps;
- }
-
- public boolean[] getInner() {
- return inner;
- }
-
- public void setInner(boolean[] inner) {
- this.inner = inner;
+ pkgr.setNumInputs(numInps);
}
/**
- * From the inputs, constructs the output tuple
- * for this co-group in the required format which
- * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+ * From the inputs, constructs the output tuple for this co-group in the
+ * required format which is (key, {bag of tuples from input 1}, {bag of
+ * tuples from input 2}, ...)
*/
@Override
public Result getNextTuple() throws ExecException {
- Tuple res;
-
if(firstTime){
firstTime = false;
if (PigMapReduce.sJobConfInternal.get() != null) {
- String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ String bagType = PigMapReduce.sJobConfInternal.get().get(
+ "pig.cachedbag.type");
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
}
}
+ int numInputs = pkgr.getNumInputs(keyWritable.getIndex());
+ boolean[] readOnce = new boolean[numInputs];
+ for (int i = 0; i < numInputs; i++)
+ readOnce[i] = false;
- if(distinct) {
- // only set the key which has the whole
- // tuple
- res = mTupleFactory.newTuple(1);
- res.set(0, key);
- } else {
- //Create numInputs bags
+ if (isInputAttached()) {
+ // Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
if (isAccumulative()) {
+ readOnce[numInputs - 1] = false;
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
// which contains tuples from one batch
@@ -251,22 +216,35 @@ public class POPackage extends PhysicalO
}
} else {
+ readOnce[numInputs - 1] = true;
+ // We know the tuples will come sorted by index, so we can wrap
+ // the last input in a ReadOnceBag and let the Packager decide
+ // whether or not to read into memory
+
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
- dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
- // In a very rare case if there is a POStream after this
- // POPackage in the pipeline and is also blocking the pipeline;
- // constructor argument should be 2 * numInputs. But for one obscure
- // case we don't want to pay the penalty all the time.
+ dbs[i] = useDefaultBag ? BagFactory.getInstance()
+ .newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POPackage in the pipeline and is also blocking the
+ // pipeline;
+ // constructor argument should be 2 * numInputs. But for one
+ // obscure
+ // case we don't want to pay the penalty all the time.
: new InternalCachedBag(numInputs);
}
- //For each indexed tup in the inp, sort them
- //into their corresponding bags based
- //on the index
+ // For each indexed tup in the inp, sort them
+ // into their corresponding bags based
+ // on the index
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
+ if (index == numInputs - 1) {
+ dbs[index] = new PeekedBag(pkgr, ntup, tupIter,
+ keyWritable);
+ break;
+ }
+ Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
if (numInputs == 1) {
@@ -278,109 +256,29 @@ public class POPackage extends PhysicalO
} else {
dbs[index].add(copy);
}
- if(getReporter()!=null) {
+ if (getReporter() != null) {
getReporter().progress();
}
}
}
-
- //Construct the output tuple by appending
- //the key and all the above constructed bags
- //and return it.
- res = mTupleFactory.newTuple(numInputs+1);
- res.set(0,key);
- int i=-1;
- for (DataBag bag : dbs) {
- i++;
- if(inner[i] && !isAccumulative()){
- if(bag.size()==0){
- detachInput();
- Result r = new Result();
- r.returnStatus = POStatus.STATUS_NULL;
- return r;
- }
- }
-
- res.set(i+1,bag);
- }
+ // Construct the output tuple by appending
+ // the key and all the above constructed bags
+ // and return it.
+ pkgr.attachInput(key, dbs, readOnce);
+ detachInput();
}
- Result r = new Result();
- r.returnStatus = POStatus.STATUS_OK;
- if (!isAccumulative())
- r.result = illustratorMarkup(null, res, 0);
- else
- r.result = res;
- detachInput();
- return r;
- }
-
- protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
- // Need to make a copy of the value, as hadoop uses the same ntup
- // to represent each value.
- Tuple val = (Tuple)ntup.getValueAsPigType();
-
- Tuple copy = null;
- // The "value (val)" that we just got may not
- // be the complete "value". It may have some portions
- // in the "key" (look in POLocalRearrange for more comments)
- // If this is the case we need to stitch
- // the "value" together.
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(index);
- boolean isProjectStar = lrKeyInfo.first;
- Map<Integer, Integer> keyLookup = lrKeyInfo.second;
- int keyLookupSize = keyLookup.size();
-
- if( keyLookupSize > 0) {
-
- // we have some fields of the "value" in the
- // "key".
- int finalValueSize = keyLookupSize + val.size();
- copy = mTupleFactory.newTuple(finalValueSize);
- int valIndex = 0; // an index for accessing elements from
- // the value (val) that we have currently
- for(int i = 0; i < finalValueSize; i++) {
- Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
- // the field for this index is not in the
- // key - so just take it from the "value"
- // we were handed
- copy.set(i, val.get(valIndex));
- valIndex++;
- } else {
- // the field for this index is in the key
- if(isKeyTuple && isKeyCompound) {
- // the key is a tuple, extract the
- // field out of the tuple
- copy.set(i, keyAsTuple.get(keyIndex));
- } else {
- copy.set(i, key);
- }
- }
- }
- copy = illustratorMarkup2(val, copy);
- } else if (isProjectStar) {
- // the whole "value" is present in the "key"
- copy = mTupleFactory.newTuple(keyAsTuple.getAll());
- copy = illustratorMarkup2(keyAsTuple, copy);
- } else {
-
- // there is no field of the "value" in the
- // "key" - so just make a copy of what we got
- // as the "value"
- copy = mTupleFactory.newTuple(val.getAll());
- copy = illustratorMarkup2(val, copy);
- }
- return copy;
+ return pkgr.getNext();
}
- public byte getKeyType() {
- return keyType;
+ public Packager getPkgr() {
+ return pkgr;
}
- public void setKeyType(byte keyType) {
- this.keyType = keyType;
+ public void setPkgr(Packager pkgr) {
+ this.pkgr = pkgr;
+ pkgr.setParent(this);
+ pkgr.setIllustrator(illustrator);
}
/**
@@ -393,74 +291,11 @@ public class POPackage extends PhysicalO
clone.mKey = new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
clone.requestedParallelism = requestedParallelism;
clone.resultType = resultType;
- clone.keyType = keyType;
clone.numInputs = numInputs;
- if (inner!=null)
- {
- clone.inner = new boolean[inner.length];
- for (int i = 0; i < inner.length; i++) {
- clone.inner[i] = inner[i];
- }
- }
- else
- clone.inner = null;
+ clone.pkgr = (Packager) this.pkgr.clone();
return clone;
}
- /**
- * @param keyInfo the keyInfo to set
- */
- public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
- this.keyInfo = keyInfo;
- }
-
- /**
- * @param keyTuple the keyTuple to set
- */
- public void setKeyTuple(boolean keyTuple) {
- this.isKeyTuple = keyTuple;
- }
-
- /**
- * @param keyCompound the keyCompound to set
- */
- public void setKeyCompound(boolean keyCompound) {
- this.isKeyCompound = keyCompound;
- }
-
- /**
- * @return the keyInfo
- */
- public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
- return keyInfo;
- }
-
- /**
- * @return the distinct
- */
- public boolean isDistinct() {
- return distinct;
- }
-
- /**
- * @param distinct the distinct to set
- */
- public void setDistinct(boolean distinct) {
- this.distinct = distinct;
- }
-
- public void setUseSecondaryKey(boolean useSecondaryKey) {
- this.useSecondaryKey = useSecondaryKey;
- }
-
- public void setPackageType(PackageType type) {
- this.pkgType = type;
- }
-
- public PackageType getPackageType() {
- return pkgType;
- }
-
class POPackageTupleBuffer implements AccumulativeTupleBuffer {
private List<Tuple>[] bags;
private Iterator<NullableTuple> iter;
@@ -499,25 +334,26 @@ public class POPackage extends PhysicalO
key = currKey;
for(int i=0; i<batchSize; i++) {
if (iter.hasNext()) {
- NullableTuple ntup = iter.next();
- int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
- if (numInputs == 1) {
-
- // this is for multi-query merge where
- // the numInputs is always 1, but the index
- // (the position of the inner plan in the
- // enclosed operator) may not be 1.
- bags[0].add(copy);
- } else {
- bags[index].add(copy);
- }
+ NullableTuple ntup = iter.next();
+ int index = ntup.getIndex();
+ Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ bags[0].add(copy);
+ } else {
+ bags[index].add(copy);
+ }
}else{
break;
}
}
}
+ @Override
public void clear() {
for(int i=0; i<bags.length; i++) {
bags[i].clear();
@@ -525,6 +361,7 @@ public class POPackage extends PhysicalO
iter = null;
}
+ @Override
public Iterator<Tuple> getTuples(int index) {
return bags[index].iterator();
}
@@ -532,76 +369,82 @@ public class POPackage extends PhysicalO
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
}
- };
+ };
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return pkgr.illustratorMarkup(in, out, eqClassIndex);
+ }
+
+ public int numberOfEquivalenceClasses() {
+ return pkgr.numberOfEquivalenceClasses();
+ }
+
+ // A ReadOnceBag that we've already "peeked" at
+ private static class PeekedBag extends ReadOnceBag {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ NullableTuple head;
+ int index;
+
+ public PeekedBag(Packager pkgr, NullableTuple head,
+ Iterator<NullableTuple> tupIter,
+ PigNullableWritable keyWritable) {
+ super(pkgr, tupIter, keyWritable);
+ this.head = head;
+ this.index = head.getIndex();
+ }
- private Tuple illustratorMarkup2(Object in, Object out) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- illustrator.getLineage().insert(tOut);
- tOut.synthetic = ((ExampleTuple) in).synthetic;
- illustrator.getLineage().union(tOut, (Tuple) in);
- return tOut;
- } else
- return (Tuple) out;
- }
-
- @Override
- public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert(tOut);
- Tuple tmp;
- boolean synthetic = false;
- if (illustrator.getEquivalenceClasses() == null) {
- LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
- for (int i = 0; i < numInputs; ++i) {
- IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
- equivalenceClasses.add(equivalenceClass);
- }
- illustrator.setEquivalenceClasses(equivalenceClasses, this);
- }
-
- if (distinct) {
- int count;
- for (count = 0; tupIter.hasNext(); ++count) {
- NullableTuple ntp = tupIter.next();
- tmp = (Tuple) ntp.getValueAsPigType();
- if (!tmp.equals(tOut))
- lineageTracer.union(tOut, tmp);
- }
- if (count > 1) // only non-distinct tuples are inserted into the equivalence class
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- illustrator.addData((Tuple) tOut);
- return (Tuple) tOut;
- }
- boolean outInEqClass = true;
- try {
- for (int i = 1; i < numInputs+1; i++)
- {
- DataBag dbs = (DataBag) ((Tuple) out).get(i);
- Iterator<Tuple> iter = dbs.iterator();
- if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
- outInEqClass = false;
- while (iter.hasNext()) {
- tmp = iter.next();
- // any of synthetic data in bags causes the output tuple to be synthetic
- if (!synthetic && ((ExampleTuple)tmp).synthetic)
- synthetic = true;
- lineageTracer.union(tOut, tmp);
- }
- }
- } catch (ExecException e) {
- // TODO better exception handling
- throw new RuntimeException("Illustrator exception :"+e.getMessage());
- }
- if (outInEqClass)
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- tOut.synthetic = synthetic;
- illustrator.addData((Tuple) tOut);
- return tOut;
- } else
- return (Tuple) out;
- }
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+ boolean headReturned = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!headReturned)
+ return true;
+
+ return tupIter.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ if (!headReturned) {
+ headReturned = true;
+ try {
+ return pkgr.getValueTuple(keyWritable, head,
+ head.getIndex());
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "PeekedBag failed to get value tuple : "
+ + e.toString());
+ }
+ }
+ NullableTuple ntup = tupIter.next();
+ Tuple ret = null;
+ try {
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "PeekedBag failed to get value tuple : "
+ + e.toString());
+ }
+ if (getReporter() != null) {
+ getReporter().progress();
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "PeekedBag does not support removal");
+ }
+ };
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sun Apr 6 10:54:13 2014
@@ -229,7 +229,7 @@ public class POPartialAgg extends Physic
}
}
avgTupleSize = estTotalMem / estTuples;
- int totalTuples = memLimits.getCacheLimit();
+ long totalTuples = memLimits.getCacheLimit();
LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Sun Apr 6 10:54:13 2014
@@ -0,0 +1,486 @@
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.Illustrable;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+public class Packager implements Illustrable, Serializable, Cloneable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ protected boolean[] readOnce;
+
+ protected DataBag[] bags;
+
+ public static enum PackageType {
+ GROUP, JOIN
+ };
+
+ protected transient Illustrator illustrator = null;
+
+ // The key being worked on
+ Object key;
+
+ // marker to indicate if key is a tuple
+ protected boolean isKeyTuple = false;
+ // marker to indicate if the tuple key is compound in nature
+ protected boolean isKeyCompound = false;
+
+ // key's type
+ byte keyType;
+
+ // The number of inputs to this
+ // co-group. 0 indicates a distinct, which means there will only be a
+ // key, no value.
+ int numInputs;
+
+ // If the attaching map-reduce plan use secondary sort key
+ boolean useSecondaryKey = false;
+
+ // Denotes if inner is specified
+ // on a particular input
+ boolean[] inner;
+
+ // flag to denote whether there is a distinct
+ // leading to this package
+ protected boolean distinct = false;
+
+ // A mapping of input index to key information got from LORearrange
+ // for that index. The Key information is a pair of boolean, Map.
+ // The boolean indicates whether there is a lone project(*) in the
+ // cogroup by. If not, the Map has a mapping of column numbers in the
+ // "value" to column numbers in the "key" which contain the fields in
+ // the "value"
+ protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+ private PackageType pkgType;
+
+ boolean firstTime = true;
+ boolean useDefaultBag = false;
+
+ protected POPackage parent = null;
+
+ protected static final BagFactory mBagFactory = BagFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory
+ .getInstance();
+
+ public Object getKey(PigNullableWritable key) throws ExecException {
+ Object keyObject = key.getValueAsPigType();
+ if (useSecondaryKey) {
+ return ((Tuple) keyObject).get(0);
+ } else {
+ return keyObject;
+ }
+ }
+
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ checkBagType();
+
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // We assume that we need all bags materialized. Specialized subclasses
+ // may choose to handle this differently
+ for (int i = 0; i < bags.length; i++) {
+ if (readOnce[i]) {
+ DataBag materializedBag = getBag();
+ materializedBag.addAll(bags[i]);
+ bags[i] = materializedBag;
+ }
+ }
+ }
+
+ public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ Tuple res;
+
+ if (isDistinct()) {
+ // only set the key which has the whole
+ // tuple
+ res = mTupleFactory.newTuple(1);
+ res.set(0, key);
+ } else {
+ // Construct the output tuple by appending
+ // the key and all the above constructed bags
+ // and return it.
+ res = mTupleFactory.newTuple(numInputs + 1);
+ res.set(0, key);
+ int i = -1;
+ for (DataBag bag : bags) {
+ i++;
+ if (inner[i]) {
+ if (bag.size() == 0) {
+ detachInput();
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_NULL;
+ return r;
+ }
+ }
+
+ res.set(i + 1, bag);
+ }
+ }
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_OK;
+ r.result = illustratorMarkup(null, res, 0);
+ detachInput();
+ return r;
+ }
+
+ public void detachInput() {
+ key = null;
+ bags = null;
+ }
+
+ protected Tuple illustratorMarkup2(Object in, Object out) {
+ if (illustrator != null) {
+ ExampleTuple tOut;
+ if (!(out instanceof ExampleTuple)) {
+ tOut = new ExampleTuple((Tuple) out);
+ } else {
+ tOut = (ExampleTuple) out;
+ }
+ illustrator.getLineage().insert(tOut);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ illustrator.getLineage().union(tOut, (Tuple) in);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ protected Tuple starMarkup(Tuple key, Tuple val, Tuple out){
+ if (illustrator != null){
+ Tuple copy = illustratorMarkup2(key, out);
+ // For distinct, we also need to retain lineage information from the values.
+ if (isDistinct())
+ copy = illustratorMarkup2(val, out);
+ return copy;
+ } else
+ return (Tuple) out;
+ }
+
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ Object key = getKey(keyWritable);
+ // Need to make a copy of the value, as hadoop uses the same ntup
+ // to represent each value.
+ Tuple val = (Tuple) ntup.getValueAsPigType();
+
+ Tuple copy = null;
+ // The "value (val)" that we just got may not
+ // be the complete "value". It may have some portions
+ // in the "key" (look in POLocalRearrange for more comments)
+ // If this is the case we need to stitch
+ // the "value" together.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = keyInfo.get(index);
+ boolean isProjectStar = lrKeyInfo.first;
+ Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+ int keyLookupSize = keyLookup.size();
+
+ if (keyLookupSize > 0) {
+
+ // we have some fields of the "value" in the
+ // "key".
+ int finalValueSize = keyLookupSize + val.size();
+ copy = mTupleFactory.newTuple(finalValueSize);
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for (int i = 0; i < finalValueSize; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if (keyIndex == null) {
+ // the field for this index is not in the
+ // key - so just take it from the "value"
+ // we were handed
+ copy.set(i, val.get(valIndex));
+ valIndex++;
+ } else {
+ // the field for this index is in the key
+ if (isKeyTuple && isKeyCompound) {
+ // the key is a tuple, extract the
+ // field out of the tuple
+ copy.set(i, ((Tuple) key).get(keyIndex));
+ } else {
+ copy.set(i, key);
+ }
+ }
+ }
+ copy = illustratorMarkup2(val, copy);
+ } else if (isProjectStar) {
+
+ // the whole "value" is present in the "key"
+ copy = mTupleFactory.newTuple(((Tuple) key).getAll());
+ copy = starMarkup((Tuple) key, val, copy);
+ } else {
+
+ // there is no field of the "value" in the
+ // "key" - so just make a copy of what we got
+ // as the "value"
+ copy = mTupleFactory.newTuple(val.getAll());
+ copy = illustratorMarkup2(val, copy);
+ }
+ return copy;
+ }
+
+ public byte getKeyType() {
+ return keyType;
+ }
+
+ public void setKeyType(byte keyType) {
+ this.keyType = keyType;
+ }
+
+ /**
+ * @return the isKeyTuple
+ */
+ public boolean getKeyTuple() {
+ return isKeyTuple;
+ }
+
+ /**
+ * @return the keyAsTuple
+ */
+ public Tuple getKeyAsTuple() {
+ return isKeyTuple ? (Tuple) key : null;
+ }
+
+ /**
+ * @return the key
+ */
+ public Object getKey() {
+ return key;
+ }
+
+ public boolean[] getInner() {
+ return inner;
+ }
+
+ public void setInner(boolean[] inner) {
+ this.inner = inner;
+ }
+
+ /**
+ * @param keyInfo
+ * the keyInfo to set
+ */
+ public void setKeyInfo(
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+ this.keyInfo = keyInfo;
+ }
+
+ /**
+ * @param keyTuple
+ * the keyTuple to set
+ */
+ public void setKeyTuple(boolean keyTuple) {
+ this.isKeyTuple = keyTuple;
+ }
+
+ /**
+ * @param keyCompound
+ * the keyCompound to set
+ */
+ public void setKeyCompound(boolean keyCompound) {
+ this.isKeyCompound = keyCompound;
+ }
+
+ /**
+ * @return the keyInfo
+ */
+ public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+ return keyInfo;
+ }
+
+ public Illustrator getIllustrator() {
+ return illustrator;
+ }
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ }
+
+ /**
+ * @return the distinct
+ */
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ /**
+ * @param distinct
+ * the distinct to set
+ */
+ public void setDistinct(boolean distinct) {
+ this.distinct = distinct;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public void setPackageType(PackageType type) {
+ this.pkgType = type;
+ }
+
+ public PackageType getPackageType() {
+ return pkgType;
+ }
+
+ public int getNumInputs(byte index) {
+ return numInputs;
+ }
+
+ public int getNumInputs() {
+ return numInputs;
+ }
+
+ public void setNumInputs(int numInputs) {
+ this.numInputs = numInputs;
+ }
+
+ @Override
+ public Packager clone() throws CloneNotSupportedException {
+ Packager clone = (Packager) super.clone();
+ clone.setNumInputs(numInputs);
+ clone.setPackageType(pkgType);
+ clone.setDistinct(distinct);
+ if (inner != null) {
+ clone.inner = new boolean[inner.length];
+ for (int i = 0; i < inner.length; i++) {
+ clone.inner[i] = inner[i];
+ }
+ } else
+ clone.inner = null;
+ if (keyInfo != null)
+ clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
+ keyInfo));
+ clone.setKeyCompound(isKeyCompound);
+ clone.setKeyTuple(isKeyTuple);
+ clone.setKeyType(keyType);
+ clone.setUseSecondaryKey(useSecondaryKey);
+ return clone;
+ }
+
+ public String name() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // All customized packagers are introduced during MRCompilaition.
+ // Illustrate happens before that, so we only have to focus on the basic
+ // POPackage
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ boolean synthetic = false;
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+ }
+
+ if (isDistinct()) {
+ int count = 0;
+ for (Tuple tmp : bags[0]){
+ count++;
+ if (!tmp.equals(tOut))
+ lineageTracer.union(tOut, tmp);
+ }
+ if (count > 1) // only non-distinct tuples are inserted into the
+ // equivalence class
+ illustrator.getEquivalenceClasses().get(eqClassIndex)
+ .add(tOut);
+ illustrator.addData((Tuple) tOut);
+ return (Tuple) tOut;
+ }
+ boolean outInEqClass = true;
+ try {
+ for (int i = 1; i < numInputs + 1; i++) {
+ DataBag dbs = (DataBag) ((Tuple) out).get(i);
+ Iterator<Tuple> iter = dbs.iterator();
+ if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2
+ // records
+ outInEqClass = false;
+ while (iter.hasNext()) {
+ Tuple tmp = iter.next();
+ // any of synthetic data in bags causes the output tuple
+ // to be synthetic
+ if (!synthetic && ((ExampleTuple) tmp).synthetic)
+ synthetic = true;
+ lineageTracer.union(tOut, tmp);
+ }
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"
+ + e.getMessage());
+ }
+ if (outInEqClass)
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ public void setParent(POPackage pack) {
+ parent = pack;
+ }
+
+ public int numberOfEquivalenceClasses() {
+ return 1;
+ }
+
+ public void checkBagType() {
+ if(firstTime){
+ firstTime = false;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+ }
+
+ public DataBag getBag(){
+ return useDefaultBag ? mBagFactory.newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POJoinPackage in the pipeline and is also blocking the pipeline;
+ // constructor argument should be 2 * numInputs. But for one obscure
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs-1);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Sun Apr 6 10:54:13 2014
@@ -51,7 +51,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
@@ -59,13 +58,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -209,20 +206,6 @@ public class PlanHelper {
}
@Override
- public void visitCombinerPackage(POCombinerPackage pkg)
- throws VisitorException {
- super.visitCombinerPackage(pkg);
- visit(pkg);
- }
-
- @Override
- public void visitMultiQueryPackage(POMultiQueryPackage pkg)
- throws VisitorException {
- super.visitMultiQueryPackage(pkg);
- visit(pkg);
- }
-
- @Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
super.visitPOForEach(nfe);
visit(nfe);
@@ -400,13 +383,6 @@ public class PlanHelper {
}
@Override
- public void visitJoinPackage(POJoinPackage joinPackage)
- throws VisitorException {
- super.visitJoinPackage(joinPackage);
- visit(joinPackage);
- }
-
- @Override
public void visitCast(POCast cast) {
super.visitCast(cast);
visit(cast);
@@ -449,7 +425,6 @@ public class PlanHelper {
visit(stream);
}
- @Override
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
super.visitSkewedJoin(sk);
visit(sk);
Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Sun Apr 6 10:54:13 2014
@@ -22,30 +22,28 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
+
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
/**
- * This bag is specifically created for use by POPackageLite. So it has three
- * properties, the NullableTuple iterator, the key (Object) and the keyInfo
- * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three
- * of which are required in the constructor call. This bag does not store
- * the tuples in memory, but has access to an iterator typically provided by
- * Hadoop. Use this when you already have an iterator over tuples and do not
- * want to copy over again to a new bag.
+ * This bag does not store the tuples in memory, but has access to an iterator
+ * typically provided by Hadoop. Use this when you already have an iterator over
+ * tuples and do not want to copy over again to a new bag.
*/
public class ReadOnceBag implements DataBag {
- // The Package operator that created this
- POPackageLite pkg;
+ // The Packager that created this
+ protected Packager pkgr;
//The iterator of Tuples. Marked transient because we will never serialize this.
- transient Iterator<NullableTuple> tupIter;
+ protected transient Iterator<NullableTuple> tupIter;
// The key being worked on
- Object key;
+ protected PigNullableWritable keyWritable;
/**
*
@@ -60,10 +58,11 @@ public class ReadOnceBag implements Data
* @param tupIter Iterator<NullableTuple>
* @param key Object
*/
- public ReadOnceBag(POPackageLite pkg, Iterator<NullableTuple> tupIter, Object key) {
- this.pkg = pkg;
+ public ReadOnceBag(Packager pkgr, Iterator<NullableTuple> tupIter,
+ PigNullableWritable keyWritable) {
+ this.pkgr = pkgr;
this.tupIter = tupIter;
- this.key = key;
+ this.keyWritable = keyWritable;
}
/* (non-Javadoc)
@@ -177,27 +176,23 @@ public class ReadOnceBag implements Data
@Override
public boolean equals(Object other) {
- if(other instanceof ReadOnceBag)
- {
- if(pkg.getKeyTuple())
- {
- if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKeyTuple() == ((ReadOnceBag)other).pkg.getKeyTuple() && pkg.getKeyAsTuple().equals(((ReadOnceBag)other).pkg.getKeyAsTuple()))
- {
+ if (other instanceof ReadOnceBag) {
+ if (pkgr.getKeyTuple()) {
+ if (tupIter == ((ReadOnceBag) other).tupIter
+ && pkgr.getKeyTuple() == ((ReadOnceBag) other).pkgr
+ .getKeyTuple()
+ && pkgr.getKeyAsTuple().equals(
+ ((ReadOnceBag) other).pkgr.getKeyAsTuple())) {
return true;
- }
- else
- {
+ } else {
return false;
}
- }
- else
- {
- if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKey().equals(((ReadOnceBag)other).pkg.getKey()))
- {
+ } else {
+ if (tupIter == ((ReadOnceBag) other).tupIter
+ && pkgr.getKey().equals(
+ ((ReadOnceBag) other).pkgr.getKey())) {
return true;
- }
- else
- {
+ } else {
return false;
}
}
@@ -208,18 +203,18 @@ public class ReadOnceBag implements Data
@Override
public int hashCode() {
int hash = 7;
- if(pkg.getKeyTuple())
+ if (pkgr.getKeyTuple())
{
- hash = hash*31 + pkg.getKeyAsTuple().hashCode();
+ hash = hash * 31 + pkgr.getKeyAsTuple().hashCode();
}
else
{
- hash = hash*31 + pkg.getKey().hashCode();
+ hash = hash * 31 + pkgr.getKey().hashCode();
}
return hash;
}
- class ReadOnceBagIterator implements Iterator<Tuple>
+ protected class ReadOnceBagIterator implements Iterator<Tuple>
{
/* (non-Javadoc)
* @see java.util.Iterator#hasNext()
@@ -238,7 +233,7 @@ public class ReadOnceBag implements Data
int index = ntup.getIndex();
Tuple ret = null;
try {
- ret = pkg.getValueTuple(ntup, index, key);
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
} catch (ExecException e)
{
throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Sun Apr 6 10:54:13 2014
@@ -53,10 +53,10 @@ public abstract class SelfSpillBag exten
public static class MemoryLimits {
private long maxMemUsage;
- private int cacheLimit = Integer.MAX_VALUE;
+ private long cacheLimit = Integer.MAX_VALUE;
private long memUsage = 0;
private long numObjsSizeChecked = 0;
-
+
private static float cachedMemUsage = 0.2F;
private static long maxMem = 0;
static {
@@ -99,11 +99,11 @@ public abstract class SelfSpillBag exten
*
* @return number of objects limit
*/
- public int getCacheLimit() {
+ public long getCacheLimit() {
if (numObjsSizeChecked > 0) {
long avgUsage = memUsage / numObjsSizeChecked;
if (avgUsage > 0) {
- cacheLimit = (int) (maxMemUsage / avgUsage);
+ cacheLimit = maxMemUsage / avgUsage;
}
}
return cacheLimit;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Sun Apr 6 10:54:13 2014
@@ -56,7 +56,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -64,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFrontend;
@@ -357,7 +357,7 @@ public class LogToPhyTranslationVisitor
expressionPlans.put(i,loRank.getRankColPlans());
POPackage poPackage = compileToLR_GR_PackTrio(loRank, null, flags, expressionPlans);
- poPackage.setPackageType(PackageType.GROUP);
+ poPackage.getPkgr().setPackageType(PackageType.GROUP);
translateSoftLinks(loRank);
List<Boolean> flattenLst = Arrays.asList(true, false);
@@ -366,7 +366,7 @@ public class LogToPhyTranslationVisitor
POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj1.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj1.setColumn(0);
- feproj1.setResultType(poPackage.getKeyType());
+ feproj1.setResultType(poPackage.getPkgr().getKeyType());
feproj1.setStar(false);
feproj1.setOverloaded(false);
fep1.add(feproj1);
@@ -667,14 +667,14 @@ public class LogToPhyTranslationVisitor
throw new VisitorException(msg, errCode, PigException.BUG, e);
}
- poPackage.setKeyType(DataType.TUPLE);
+ poPackage.getPkgr().setKeyType(DataType.TUPLE);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
boolean inner[] = new boolean[count];
for (int i=0;i<count;i++) {
inner[i] = true;
}
- poPackage.setInner(inner);
+ poPackage.getPkgr().setInner(inner);
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
@@ -999,7 +999,7 @@ public class LogToPhyTranslationVisitor
break;
case REGULAR:
POPackage poPackage = compileToLR_GR_PackTrio(cg, cg.getCustomPartitioner(), cg.getInner(), cg.getExpressionPlans());
- poPackage.setPackageType(PackageType.GROUP);
+ poPackage.getPkgr().setPackageType(PackageType.GROUP);
logToPhyMap.put(cg, poPackage);
break;
case MERGE:
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor
e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
- poPackage.setPackageType(POPackage.PackageType.JOIN);
+ poPackage.getPkgr().setPackageType(PackageType.JOIN);
}
translateSoftLinks(loj);
}
@@ -1485,10 +1485,10 @@ public class LogToPhyTranslationVisitor
}
}
- poPackage.setKeyType(type);
+ poPackage.getPkgr().setKeyType(type);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
- poPackage.setInner(innerFlags);
+ poPackage.getPkgr().setInner(innerFlags);
return poPackage;
}