You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/06 12:54:14 UTC

svn commit: r1585283 [2/3] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/fetch/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/ap...

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Sun Apr  6 10:54:13 2014
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.Illustrator;
+
+public class JoinPackager extends Packager {
+
+    private POOptimizedForEach forEach;
+    private boolean newKey = true;
+    private Tuple res = null;
+    private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+
+    public static final String DEFAULT_CHUNK_SIZE = "1000";
+
+    private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
+    private Result forEachResult;
+    private DataBag[] dbs = null;
+
+    private int lastBagIndex;
+
+    private Iterator<Tuple> lastBagIter;
+
+    public JoinPackager(Packager p, POForEach f) {
+        super();
+        String scope = f.getOperatorKey().getScope();
+        NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+        forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        if (p!=null)
+        {
+            setKeyType(p.getKeyType());
+            setNumInputs(p.getNumInputs());
+            lastBagIndex = numInputs - 1;
+            setInner(p.getInner());
+            setKeyInfo(p.getKeyInfo());
+            this.isKeyTuple = p.isKeyTuple;
+            this.isKeyCompound = p.isKeyCompound;
+        }
+        if (f!=null)
+        {
+            setInputPlans(f.getInputPlans());
+            setToBeFlattened(f.getToBeFlattened());
+        }
+    }
+
+    /**
+     * Calls getNext to get next ForEach result. The input for POJoinPackage is
+     * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
+     * one tuple a time to the delegated ForEach operator, the input for ForEach is
+     * 
+     *     (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
+     * 
+     * of k tuples.
+     * For every ForEach input, pull all the results from ForEach.
+     * getNext will be called multiple times for a particular input,
+     * it returns one output tuple from ForEach every time we call getNext,
+     * so we need to maintain internal status to keep tracking of where we are.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+        Tuple it = null;
+
+        // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
+        // tuple res = (key, input#1, input#2....input#n), the only missing value is input#n,
+        // we will get input#n one tuple a time, fill in res, feed to ForEach.
+        // After this block, we have the first tuple of input#n in hand (kept in variable it)
+        if (newKey)
+        {
+            // Put n-1 inputs into bags
+            dbs = new DataBag[numInputs];
+            for (int i = 0; i < numInputs - 1; i++) {
+                dbs[i] = bags[i];
+            }
+
+            // For last bag, we always use NonSpillableBag.
+            dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize);
+
+            lastBagIter = bags[lastBagIndex].iterator();
+
+            // If we don't have any tuple for input#n
+            // we do not need any further process, return EOP
+            if (!lastBagIter.hasNext()) {
+                // we will return at this point because we ought
+                // to be having a flatten on this last input
+                // and we have an empty bag which should result
+                // in this key being taken out of the output
+                newKey = true;
+                return eopResult;
+            }
+
+            res = mTupleFactory.newTuple(numInputs+1);
+            for (int i = 0; i < dbs.length; i++)
+                res.set(i+1,dbs[i]);
+
+            res.set(0,key);
+            // if we have an inner anywhere and the corresponding
+            // bag is empty, we can just return
+            for (int i = 0; i < dbs.length - 1; i++) {
+                if(inner[i]&&dbs[i].size()==0){
+                    detachInput();
+                    return eopResult;
+                }
+            }
+            newKey = false;
+        }
+
+        // Keep attaching input tuple to ForEach, until:
+        // 1. We can initialize ForEach.getNext();
+        // 2. There is no more input#n
+        while (lastBagIter.hasNext() || forEach.processingPlan) {
+            // if a previous call to foreach.getNext()
+            // has still not returned all output, process it
+            while (forEach.processingPlan) {
+                forEachResult = forEach.getNextTuple();
+                switch (forEachResult.returnStatus) {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+
+            if (lastBagIter.hasNext()) {
+                // try setting up a bag of CHUNKSIZE OR
+                // the remainder of the bag of last input
+                // (if < CHUNKSIZE) to foreach
+                dbs[lastBagIndex].clear(); // clear last chunk
+                for (int i = 0; i < chunkSize && lastBagIter.hasNext(); i++) {
+                    it = lastBagIter.next();
+                    dbs[lastBagIndex].add(it);
+                }
+            } else {
+                detachInput();
+                return eopResult;
+            }
+
+            // Attach the input to forEach
+            forEach.attachInput(res);
+
+            // pull output tuple from ForEach
+            Result forEachResult = forEach.getNextTuple();
+            {
+                switch (forEachResult.returnStatus) {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+        }
+        detachInput();
+        return eopResult;
+    }
+
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        checkBagType();
+
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // JoinPackager expects all but the last bag to be materialized
+        for (int i = 0; i < bags.length - 1; i++) {
+            if (readOnce[i]) {
+                DataBag materializedBag = getBag();
+                materializedBag.addAll(bags[i]);
+                bags[i] = materializedBag;
+            }
+        }
+        if (readOnce[numInputs - 1] != true) {
+            throw new ExecException(
+                    "JoinPackager expects the last input to be streamed");
+        }
+        this.newKey = true;
+    }
+
+    public List<PhysicalPlan> getInputPlans() {
+        return forEach.getInputPlans();
+    }
+
+    public void setInputPlans(List<PhysicalPlan> plans) {
+        forEach.setInputPlans(plans);
+    }
+
+    public void setToBeFlattened(List<Boolean> flattens) {
+        forEach.setToBeFlattened(flattens);
+    }
+
+    /**
+     * @return the forEach
+     */
+    public POOptimizedForEach getForEach() {
+        return forEach;
+    }
+
+    /**
+     * @param chunkSize - the chunk size for the biggest input
+     */
+    public void setChunkSize(long chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        this.illustrator = illustrator;
+        forEach.setIllustrator(illustrator);
+    }
+
+    @Override
+    public String name() {
+        return this.getClass().getSimpleName() + "(" + forEach.getFlatStr() + ")";
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Sun Apr  6 10:54:13 2014
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802
+ * for more details.
+ */
+public class LitePackager extends Packager {
+
+    private PigNullableWritable keyWritable;
+
+    @Override
+    public boolean[] getInner() {
+        return null;
+    }
+
+    @Override
+    public void setInner(boolean[] inner) {
+    }
+
+    /**
+     * Make a deep copy of this operator.
+     * @throws CloneNotSupportedException
+     */
+    @Override
+    public LitePackager clone() throws CloneNotSupportedException {
+        LitePackager clone = (LitePackager) super.clone();
+        clone.inner = null;
+        if (keyInfo != null) {
+            clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(keyInfo);
+        }
+        return clone;
+    }
+
+    /**
+     * @return the distinct
+     */
+    @Override
+    public boolean isDistinct() {
+        return false;
+    }
+
+    /**
+     * @param distinct
+     *            the distinct to set
+     */
+    @Override
+    public void setDistinct(boolean distinct) {
+    }
+
+    /**
+     * Similar to POPackage.getNext except that
+     * only one input is expected with index 0
+     * and ReadOnceBag is used instead of
+     * DefaultDataBag.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        Tuple res;
+
+        //Construct the output tuple by appending
+        //the key and all the above constructed bags
+        //and return it.
+        res = mTupleFactory.newTuple(numInputs+1);
+        res.set(0,key);
+        res.set(1, bags[0]);
+        detachInput();
+        Result r = new Result();
+        r.returnStatus = POStatus.STATUS_OK;
+        r.result = illustratorMarkup(null, res, 0);
+        return r;
+    }
+
+    /**
+     * Makes use of the superclass method, but this requires an additional
+     * parameter key passed by ReadOnceBag. key of this instance will be set to
+     * null in detachInput call, but an instance of ReadOnceBag may have the
+     * original key that it uses. Therefore this extra argument is taken to
+     * temporarily set it before the call to the superclass method and then
+     * restore it.
+     */
+    @Override
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        PigNullableWritable origKey = this.keyWritable;
+        this.keyWritable = keyWritable;
+        Tuple retTuple = super.getValueTuple(keyWritable, ntup, index);
+        this.keyWritable = origKey;
+        return retTuple;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+            }
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = false; // not expect this to be really used
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Sun Apr  6 10:54:13 2014
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * The package operator that packages the globally rearranged tuples
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator
+ * receives an input, it extracts the index from the key and calls the
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which
+ * knows how to consume the output of this operator.
+ */
+public class MultiQueryPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private static int idxPart = 0x7F;
+
+    private List<Packager> packagers = new ArrayList<Packager>();
+
+    /**
+     * If the POLocalRearranges corresponding to the reduce plans in
+     * myPlans (the list of inner plans of the demux) have different key types
+     * then the MultiQueryOptimizer converts all the keys to be of type tuple
+     * by wrapping any non-tuple keys into Tuples (keys which are already tuples
+     * are left alone).
+     * The list below is a list of booleans indicating whether extra tuple wrapping
+     * was done for the key in the corresponding POLocalRearranges and if we need
+     * to "unwrap" the tuple to get to the key
+     */
+    private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+    /*
+     * Indicating if all the inner plans have the same
+     * map key type. If not, the keys passed in are
+     * wrapped inside tuples and need to be extracted
+     * out during the reduce phase
+     */
+    private boolean sameMapKeyType = true;
+
+    /*
+     * Indicating if this operator is in a combiner.
+     * If not, this operator is in a reducer and the key
+     * values must first be extracted from the tuple-wrap
+     * before writing out to the disk
+     */
+    private boolean inCombiner = false;
+
+    private PigNullableWritable keyWritable = null;
+
+    /**
+     * Appends the specified package object to the end of
+     * the package list.
+     * 
+     * @param pack package to be appended to the list
+     */
+    public void addPackager(Packager pkgr) {
+        packagers.add(pkgr);
+    }
+
+    /**
+     * Appends the specified package object to the end of
+     * the package list.
+     * 
+     * @param pack package to be appended to the list
+     * @param mapKeyType the map key type associated with the package
+     */
+    public void addPackager(Packager pkgr, byte mapKeyType) {
+        packagers.add(pkgr);
+        // if mapKeyType is already a tuple, we will NOT
+        // be wrapping it in an extra tuple. If it is not
+        // a tuple, we will wrap into in a tuple
+        isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+    }
+
+    /**
+     * Returns the list of packages.
+     * 
+     * @return the list of the packages
+     */
+    public List<Packager> getPackagers() {
+        return packagers;
+    }
+
+    /**
+     * Constructs the output tuple from the inputs.
+     * <p>
+     * The output is consumed by for the demultiplexer operator
+     * (PODemux) in the format (key, {bag of tuples}) where key
+     * is an indexed WritableComparable, not the wrapped value as a pig type.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        byte origIndex = keyWritable.getIndex();
+
+        int index = (int)origIndex;
+        index &= idxPart;
+
+        if (index >= packagers.size() || index < 0) {
+            int errCode = 2140;
+            String msg = "Invalid package index " + index
+                    + " should be in the range between 0 and "
+                    + packagers.size();
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+
+        Packager pkgr = packagers.get(index);
+
+        // check to see if we need to unwrap the key. The keys may be
+        // wrapped inside a tuple by LocalRearrange operator when jobs
+        // with different map key types are merged
+        PigNullableWritable curKey = keyWritable;
+        if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+            Tuple tup = (Tuple) keyWritable.getValueAsPigType();
+            curKey = HDataType.getWritableComparableTypes(tup.get(0),
+                    pkgr.getKeyType());
+            curKey.setIndex(origIndex);
+        }
+
+        pkgr.attachInput(curKey, bags, readOnce);
+
+        Result res = pkgr.getNext();
+        pkgr.detachInput();
+        detachInput();
+
+        Tuple tuple = (Tuple)res.result;
+
+        // the object present in the first field
+        // of the tuple above is the real data without
+        // index information - this is because the
+        // package above, extracts the real data out of
+        // the PigNullableWritable object - we are going to
+        // give this result tuple to a PODemux operator
+        // which needs a PigNullableWritable first field so
+        // it can figure out the index. Therefore we need
+        // to add index to the first field of the tuple.
+
+        Object obj = tuple.get(0);
+        if (obj instanceof PigNullableWritable) {
+            ((PigNullableWritable)obj).setIndex(origIndex);
+        }
+        else {
+            PigNullableWritable myObj = null;
+            if (obj == null) {
+                myObj = new NullableUnknownWritable();
+                myObj.setNull(true);
+            }
+            else {
+                myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey));
+            }
+            myObj.setIndex(origIndex);
+            tuple.set(0, myObj);
+        }
+        // illustrator markup has been handled by "pkgr"
+        return res;
+    }
+
+    /**
+     * Returns the list of booleans that indicates if the
+     * key needs to unwrapped for the corresponding plan.
+     * 
+     * @return the list of isKeyWrapped boolean values
+     */
+    public List<Boolean> getIsKeyWrappedList() {
+        return Collections.unmodifiableList(isKeyWrapped);
+    }
+
+    /**
+     * Adds a list of IsKeyWrapped boolean values
+     * 
+     * @param lst the list of boolean values to add
+     */
+    public void addIsKeyWrappedList(List<Boolean> lst) {
+        for (Boolean b : lst) {
+            isKeyWrapped.add(b);
+        }
+    }
+
+    public void setInCombiner(boolean inCombiner) {
+        this.inCombiner = inCombiner;
+    }
+
+    public boolean isInCombiner() {
+        return inCombiner;
+    }
+
+    public void setSameMapKeyType(boolean sameMapKeyType) {
+        this.sameMapKeyType = sameMapKeyType;
+    }
+
+    public boolean isSameMapKeyType() {
+        return sameMapKeyType;
+    }
+
+    @Override
+    public int getNumInputs(byte index) {
+        return packagers.get(((int) index) & idxPart).getNumInputs(index);
+    }
+
+    @Override
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        this.keyWritable = keyWritable;
+        return packagers.get(((int) index) & idxPart).getValueTuple(
+                keyWritable, ntup, index);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sun Apr  6 10:54:13 2014
@@ -21,32 +21,29 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
 
 /**
  * The package operator that packages
@@ -67,50 +64,18 @@ public class POPackage extends PhysicalO
      */
     private static final long serialVersionUID = 1L;
 
-
-    public static enum PackageType { GROUP, JOIN };
-
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
     transient Iterator<NullableTuple> tupIter;
 
     //The key being worked on
-    Object key;
-
-    // marker to indicate if key is a tuple
-    protected boolean isKeyTuple = false;
-    // marker to indicate if the tuple key is compound in nature
-    protected boolean isKeyCompound = false;
-    // key as a Tuple object (if the key is a tuple)
-    protected Tuple keyAsTuple;
-
-    //key's type
-    byte keyType;
+    protected Object key;
 
     //The number of inputs to this
     //co-group.  0 indicates a distinct, which means there will only be a
     //key, no value.
-    int numInputs;
-
-    // If the attaching map-reduce plan use secondary sort key
-    boolean useSecondaryKey = false;
-
-    //Denotes if inner is specified
-    //on a particular input
-    boolean[] inner;
-
-    // flag to denote whether there is a distinct
-    // leading to this package
-    protected boolean distinct = false;
-
-    // A mapping of input index to key information got from LORearrange
-    // for that index. The Key information is a pair of boolean, Map.
-    // The boolean indicates whether there is a lone project(*) in the
-    // cogroup by. If not, the Map has a mapping of column numbers in the
-    // "value" to column numbers in the "key" which contain the fields in
-    // the "value"
-    protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+    protected int numInputs;
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -119,7 +84,9 @@ public class POPackage extends PhysicalO
 
     private boolean useDefaultBag = false;
 
-    private PackageType pkgType;
+    protected Packager pkgr;
+
+    private PigNullableWritable keyWritable;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -134,16 +101,27 @@ public class POPackage extends PhysicalO
     }
 
     public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        this(k, rp, inp, new Packager());
+    }
+
+    public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
+            Packager pkgr) {
         super(k, rp, inp);
         numInputs = -1;
-        keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+        this.pkgr = pkgr;
+    }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        super.setIllustrator(illustrator);
+        pkgr.setIllustrator(illustrator);
     }
 
     @Override
     public String name() {
-        return getAliasString() + "Package" + "["
+        return getAliasString() + "Package" + "(" + pkgr.name() + ")" + "["
                 + DataType.findTypeName(resultType) + "]" + "{"
-                + DataType.findTypeName(keyType) + "}" + " - "
+                + DataType.findTypeName(pkgr.getKeyType()) + "}" + " - "
                 + mKey.toString();
     }
 
@@ -171,16 +149,9 @@ public class POPackage extends PhysicalO
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
         try {
             tupIter = inp;
-            key = k.getValueAsPigType();
-            if (useSecondaryKey) {
-                key = ((Tuple)key).get(0);
-
-            }
-            if(isKeyTuple) {
-                // key is a tuple, cache the key as a
-                // tuple for use in the getNext()
-                keyAsTuple = (Tuple)key;
-            }
+            key = pkgr.getKey(k);
+            keyWritable = k;
+            inputAttached = true;
         } catch (Exception e) {
             throw new RuntimeException(
                     "Error attaching input for key " + k +
@@ -191,9 +162,11 @@ public class POPackage extends PhysicalO
     /**
      * attachInput's better half!
      */
+    @Override
     public void detachInput() {
         tupIter = null;
         key = null;
+        inputAttached = false;
     }
 
     public int getNumInps() {
@@ -202,46 +175,38 @@ public class POPackage extends PhysicalO
 
     public void setNumInps(int numInps) {
         this.numInputs = numInps;
-    }
-
-    public boolean[] getInner() {
-        return inner;
-    }
-
-    public void setInner(boolean[] inner) {
-        this.inner = inner;
+        pkgr.setNumInputs(numInps);
     }
 
     /**
-     * From the inputs, constructs the output tuple
-     * for this co-group in the required format which
-     * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+     * From the inputs, constructs the output tuple for this co-group in the
+     * required format which is (key, {bag of tuples from input 1}, {bag of
+     * tuples from input 2}, ...)
      */
     @Override
     public Result getNextTuple() throws ExecException {
-        Tuple res;
-
         if(firstTime){
             firstTime = false;
             if (PigMapReduce.sJobConfInternal.get() != null) {
-                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                String bagType = PigMapReduce.sJobConfInternal.get().get(
+                        "pig.cachedbag.type");
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }
             }
         }
+        int numInputs = pkgr.getNumInputs(keyWritable.getIndex());
+        boolean[] readOnce = new boolean[numInputs];
+        for (int i = 0; i < numInputs; i++)
+            readOnce[i] = false;
 
-        if(distinct) {
-            // only set the key which has the whole
-            // tuple
-            res = mTupleFactory.newTuple(1);
-            res.set(0, key);
-        } else {
-            //Create numInputs bags
+        if (isInputAttached()) {
+            // Create numInputs bags
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
 
             if (isAccumulative()) {
+                readOnce[numInputs - 1] = false;
                 // create bag wrapper to pull tuples in many batches
                 // all bags have reference to the sample tuples buffer
                 // which contains tuples from one batch
@@ -251,22 +216,35 @@ public class POPackage extends PhysicalO
                 }
 
             } else {
+                readOnce[numInputs - 1] = true;
+                // We know the tuples will come sorted by index, so we can wrap
+                // the last input in a ReadOnceBag and let the Packager decide
+                // whether or not to read into memory
+
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
-                    dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
-                    // In a very rare case if there is a POStream after this
-                    // POPackage in the pipeline and is also blocking the pipeline;
-                    // constructor argument should be 2 * numInputs. But for one obscure
-                    // case we don't want to pay the penalty all the time.
+                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
+                            .newDefaultBag()
+                            // In a very rare case if there is a POStream after this
+                            // POPackage in the pipeline and is also blocking the
+                            // pipeline;
+                            // constructor argument should be 2 * numInputs. But for one
+                            // obscure
+                            // case we don't want to pay the penalty all the time.
                             : new InternalCachedBag(numInputs);
                 }
-                //For each indexed tup in the inp, sort them
-                //into their corresponding bags based
-                //on the index
+                // For each indexed tup in the inp, sort them
+                // into their corresponding bags based
+                // on the index
                 while (tupIter.hasNext()) {
                     NullableTuple ntup = tupIter.next();
                     int index = ntup.getIndex();
-                    Tuple copy = getValueTuple(ntup, index);
+                    if (index == numInputs - 1) {
+                        dbs[index] = new PeekedBag(pkgr, ntup, tupIter,
+                                keyWritable);
+                        break;
+                    }
+                    Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
 
                     if (numInputs == 1) {
 
@@ -278,109 +256,29 @@ public class POPackage extends PhysicalO
                     } else {
                         dbs[index].add(copy);
                     }
-                    if(getReporter()!=null) {
+                    if (getReporter() != null) {
                         getReporter().progress();
                     }
                 }
             }
-
-            //Construct the output tuple by appending
-            //the key and all the above constructed bags
-            //and return it.
-            res = mTupleFactory.newTuple(numInputs+1);
-            res.set(0,key);
-            int i=-1;
-            for (DataBag bag : dbs) {
-                i++;
-                if(inner[i] && !isAccumulative()){
-                    if(bag.size()==0){
-                        detachInput();
-                        Result r = new Result();
-                        r.returnStatus = POStatus.STATUS_NULL;
-                        return r;
-                    }
-                }
-
-                res.set(i+1,bag);
-            }
+            // Construct the output tuple by appending
+            // the key and all the above constructed bags
+            // and return it.
+            pkgr.attachInput(key, dbs, readOnce);
+            detachInput();
         }
-        Result r = new Result();
-        r.returnStatus = POStatus.STATUS_OK;
-        if (!isAccumulative())
-            r.result = illustratorMarkup(null, res, 0);
-        else
-            r.result = res;
-        detachInput();
-        return r;
-    }
-
-    protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
-     // Need to make a copy of the value, as hadoop uses the same ntup
-        // to represent each value.
-        Tuple val = (Tuple)ntup.getValueAsPigType();
-
-        Tuple copy = null;
-        // The "value (val)" that we just got may not
-        // be the complete "value". It may have some portions
-        // in the "key" (look in POLocalRearrange for more comments)
-        // If this is the case we need to stitch
-        // the "value" together.
-        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-            keyInfo.get(index);
-        boolean isProjectStar = lrKeyInfo.first;
-        Map<Integer, Integer> keyLookup = lrKeyInfo.second;
-        int keyLookupSize = keyLookup.size();
-
-        if( keyLookupSize > 0) {
-
-            // we have some fields of the "value" in the
-            // "key".
-            int finalValueSize = keyLookupSize + val.size();
-            copy = mTupleFactory.newTuple(finalValueSize);
-            int valIndex = 0; // an index for accessing elements from
-                              // the value (val) that we have currently
-            for(int i = 0; i < finalValueSize; i++) {
-                Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
-                    // the field for this index is not in the
-                    // key - so just take it from the "value"
-                    // we were handed
-                    copy.set(i, val.get(valIndex));
-                    valIndex++;
-                } else {
-                    // the field for this index is in the key
-                    if(isKeyTuple && isKeyCompound) {
-                        // the key is a tuple, extract the
-                        // field out of the tuple
-                        copy.set(i, keyAsTuple.get(keyIndex));
-                    } else {
-                        copy.set(i, key);
-                    }
-                }
-            }
-            copy = illustratorMarkup2(val, copy);
-        } else if (isProjectStar) {
 
-            // the whole "value" is present in the "key"
-            copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-            copy = illustratorMarkup2(keyAsTuple, copy);
-        } else {
-
-            // there is no field of the "value" in the
-            // "key" - so just make a copy of what we got
-            // as the "value"
-            copy = mTupleFactory.newTuple(val.getAll());
-            copy = illustratorMarkup2(val, copy);
-        }
-        return copy;
+        return pkgr.getNext();
     }
 
-    public byte getKeyType() {
-        return keyType;
+    public Packager getPkgr() {
+        return pkgr;
     }
 
-    public void setKeyType(byte keyType) {
-        this.keyType = keyType;
+    public void setPkgr(Packager pkgr) {
+        this.pkgr = pkgr;
+        pkgr.setParent(this);
+        pkgr.setIllustrator(illustrator);
     }
 
     /**
@@ -393,74 +291,11 @@ public class POPackage extends PhysicalO
         clone.mKey = new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
         clone.requestedParallelism = requestedParallelism;
         clone.resultType = resultType;
-        clone.keyType = keyType;
         clone.numInputs = numInputs;
-        if (inner!=null)
-        {
-            clone.inner = new boolean[inner.length];
-            for (int i = 0; i < inner.length; i++) {
-                clone.inner[i] = inner[i];
-            }
-        }
-        else
-            clone.inner = null;
+        clone.pkgr = (Packager) this.pkgr.clone();
         return clone;
     }
 
-    /**
-     * @param keyInfo the keyInfo to set
-     */
-    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
-        this.keyInfo = keyInfo;
-    }
-
-    /**
-     * @param keyTuple the keyTuple to set
-     */
-    public void setKeyTuple(boolean keyTuple) {
-        this.isKeyTuple = keyTuple;
-    }
-
-    /**
-     * @param keyCompound the keyCompound to set
-     */
-    public void setKeyCompound(boolean keyCompound) {
-        this.isKeyCompound = keyCompound;
-    }
-
-    /**
-     * @return the keyInfo
-     */
-    public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
-        return keyInfo;
-    }
-
-    /**
-     * @return the distinct
-     */
-    public boolean isDistinct() {
-        return distinct;
-    }
-
-    /**
-     * @param distinct the distinct to set
-     */
-    public void setDistinct(boolean distinct) {
-        this.distinct = distinct;
-    }
-
-    public void setUseSecondaryKey(boolean useSecondaryKey) {
-        this.useSecondaryKey = useSecondaryKey;
-    }
-
-    public void setPackageType(PackageType type) {
-        this.pkgType = type;
-    }
-
-    public PackageType getPackageType() {
-        return pkgType;
-    }
-
     class POPackageTupleBuffer implements AccumulativeTupleBuffer {
         private List<Tuple>[] bags;
         private Iterator<NullableTuple> iter;
@@ -499,25 +334,26 @@ public class POPackage extends PhysicalO
             key = currKey;
             for(int i=0; i<batchSize; i++) {
                 if (iter.hasNext()) {
-                     NullableTuple ntup = iter.next();
-                     int index = ntup.getIndex();
-                     Tuple copy = getValueTuple(ntup, index);
-                     if (numInputs == 1) {
-
-                            // this is for multi-query merge where
-                            // the numInputs is always 1, but the index
-                            // (the position of the inner plan in the
-                            // enclosed operator) may not be 1.
-                            bags[0].add(copy);
-                     } else {
-                            bags[index].add(copy);
-                     }
+                    NullableTuple ntup = iter.next();
+                    int index = ntup.getIndex();
+                    Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
+                    if (numInputs == 1) {
+
+                        // this is for multi-query merge where
+                        // the numInputs is always 1, but the index
+                        // (the position of the inner plan in the
+                        // enclosed operator) may not be 1.
+                        bags[0].add(copy);
+                    } else {
+                        bags[index].add(copy);
+                    }
                 }else{
                     break;
                 }
             }
         }
 
+        @Override
         public void clear() {
             for(int i=0; i<bags.length; i++) {
                 bags[i].clear();
@@ -525,6 +361,7 @@ public class POPackage extends PhysicalO
             iter = null;
         }
 
+        @Override
         public Iterator<Tuple> getTuples(int index) {
             return bags[index].iterator();
         }
@@ -532,76 +369,82 @@ public class POPackage extends PhysicalO
         public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
             return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
         }
-       };
+    };
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return pkgr.illustratorMarkup(in, out, eqClassIndex);
+    }
+
+    public int numberOfEquivalenceClasses() {
+        return pkgr.numberOfEquivalenceClasses();
+    }
+
+    // A ReadOnceBag that we've already "peeked" at
+    private static class PeekedBag extends ReadOnceBag {
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 1L;
+        NullableTuple head;
+        int index;
+
+        public PeekedBag(Packager pkgr, NullableTuple head,
+                Iterator<NullableTuple> tupIter,
+                PigNullableWritable keyWritable) {
+            super(pkgr, tupIter, keyWritable);
+            this.head = head;
+            this.index = head.getIndex();
+        }
 
-       private Tuple illustratorMarkup2(Object in, Object out) {
-           if(illustrator != null) {
-               ExampleTuple tOut = new ExampleTuple((Tuple) out);
-               illustrator.getLineage().insert(tOut);
-               tOut.synthetic = ((ExampleTuple) in).synthetic;
-               illustrator.getLineage().union(tOut, (Tuple) in);
-               return tOut;
-           } else
-               return (Tuple) out;
-       }
-
-       @Override
-       public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-           if(illustrator != null) {
-               ExampleTuple tOut = new ExampleTuple((Tuple) out);
-               LineageTracer lineageTracer = illustrator.getLineage();
-               lineageTracer.insert(tOut);
-               Tuple tmp;
-               boolean synthetic = false;
-               if (illustrator.getEquivalenceClasses() == null) {
-                   LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
-                   for (int i = 0; i < numInputs; ++i) {
-                       IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
-                       equivalenceClasses.add(equivalenceClass);
-                   }
-                   illustrator.setEquivalenceClasses(equivalenceClasses, this);
-               }
-
-               if (distinct) {
-                   int count;
-                   for (count = 0; tupIter.hasNext(); ++count) {
-                       NullableTuple ntp = tupIter.next();
-                       tmp = (Tuple) ntp.getValueAsPigType();
-                       if (!tmp.equals(tOut))
-                           lineageTracer.union(tOut, tmp);
-                   }
-                   if (count > 1) // only non-distinct tuples are inserted into the equivalence class
-                       illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-                   illustrator.addData((Tuple) tOut);
-                   return (Tuple) tOut;
-               }
-               boolean outInEqClass = true;
-               try {
-                   for (int i = 1; i < numInputs+1; i++)
-                   {
-                       DataBag dbs = (DataBag) ((Tuple) out).get(i);
-                       Iterator<Tuple> iter = dbs.iterator();
-                       if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
-                           outInEqClass = false;
-                       while (iter.hasNext()) {
-                           tmp = iter.next();
-                           // any of synthetic data in bags causes the output tuple to be synthetic
-                           if (!synthetic && ((ExampleTuple)tmp).synthetic)
-                               synthetic = true;
-                           lineageTracer.union(tOut, tmp);
-                       }
-                   }
-               } catch (ExecException e) {
-                 // TODO better exception handling
-                 throw new RuntimeException("Illustrator exception :"+e.getMessage());
-               }
-               if (outInEqClass)
-                   illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-               tOut.synthetic = synthetic;
-               illustrator.addData((Tuple) tOut);
-               return tOut;
-           } else
-               return (Tuple) out;
-       }
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new Iterator<Tuple>() {
+                boolean headReturned = false;
+
+                @Override
+                public boolean hasNext() {
+                    if (!headReturned)
+                        return true;
+
+                    return tupIter.hasNext();
+                }
+
+                @Override
+                public Tuple next() {
+                    if (!headReturned) {
+                        headReturned = true;
+                        try {
+                            return pkgr.getValueTuple(keyWritable, head,
+                                    head.getIndex());
+                        } catch (ExecException e) {
+                            throw new RuntimeException(
+                                    "PeekedBag failed to get value tuple : "
+                                            + e.toString());
+                        }
+                    }
+                    NullableTuple ntup = tupIter.next();
+                    Tuple ret = null;
+                    try {
+                        ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                    } catch (ExecException e) {
+                        throw new RuntimeException(
+                                "PeekedBag failed to get value tuple : "
+                                        + e.toString());
+                    }
+                    if (getReporter() != null) {
+                        getReporter().progress();
+                    }
+                    return ret;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException(
+                            "PeekedBag does not support removal");
+                }
+            };
+        }
+    }
 
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sun Apr  6 10:54:13 2014
@@ -229,7 +229,7 @@ public class POPartialAgg extends Physic
                 }
             }
             avgTupleSize = estTotalMem / estTuples;
-            int totalTuples = memLimits.getCacheLimit();
+            long totalTuples = memLimits.getCacheLimit();
             LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
             secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Sun Apr  6 10:54:13 2014
@@ -0,0 +1,486 @@
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.Illustrable;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+public class Packager implements Illustrable, Serializable, Cloneable {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    protected boolean[] readOnce;
+
+    protected DataBag[] bags;
+
+    public static enum PackageType {
+        GROUP, JOIN
+    };
+
+    protected transient Illustrator illustrator = null;
+
+    // The key being worked on
+    Object key;
+
+    // marker to indicate if key is a tuple
+    protected boolean isKeyTuple = false;
+    // marker to indicate if the tuple key is compound in nature
+    protected boolean isKeyCompound = false;
+
+    // key's type
+    byte keyType;
+
+    // The number of inputs to this
+    // co-group. 0 indicates a distinct, which means there will only be a
+    // key, no value.
+    int numInputs;
+
+    // If the attaching map-reduce plan use secondary sort key
+    boolean useSecondaryKey = false;
+
+    // Denotes if inner is specified
+    // on a particular input
+    boolean[] inner;
+
+    // flag to denote whether there is a distinct
+    // leading to this package
+    protected boolean distinct = false;
+
+    // A mapping of input index to key information got from LORearrange
+    // for that index. The Key information is a pair of boolean, Map.
+    // The boolean indicates whether there is a lone project(*) in the
+    // cogroup by. If not, the Map has a mapping of column numbers in the
+    // "value" to column numbers in the "key" which contain the fields in
+    // the "value"
+    protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+    private PackageType pkgType;
+
+    boolean firstTime = true;
+    boolean useDefaultBag = false;
+
+    protected POPackage parent = null;
+
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory
+            .getInstance();
+
+    public Object getKey(PigNullableWritable key) throws ExecException {
+        Object keyObject = key.getValueAsPigType();
+        if (useSecondaryKey) {
+            return ((Tuple) keyObject).get(0);
+        } else {
+            return keyObject;
+        }
+    }
+
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        checkBagType();
+
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // We assume that we need all bags materialized. Specialized subclasses
+        // may choose to handle this differently
+        for (int i = 0; i < bags.length; i++) {
+            if (readOnce[i]) {
+                DataBag materializedBag = getBag();
+                materializedBag.addAll(bags[i]);
+                bags[i] = materializedBag;
+            }
+        }
+    }
+
+    public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+        Tuple res;
+
+        if (isDistinct()) {
+            // only set the key which has the whole
+            // tuple
+            res = mTupleFactory.newTuple(1);
+            res.set(0, key);
+        } else {
+            // Construct the output tuple by appending
+            // the key and all the above constructed bags
+            // and return it.
+            res = mTupleFactory.newTuple(numInputs + 1);
+            res.set(0, key);
+            int i = -1;
+            for (DataBag bag : bags) {
+                i++;
+                if (inner[i]) {
+                    if (bag.size() == 0) {
+                        detachInput();
+                        Result r = new Result();
+                        r.returnStatus = POStatus.STATUS_NULL;
+                        return r;
+                    }
+                }
+
+                res.set(i + 1, bag);
+            }
+        }
+        Result r = new Result();
+        r.returnStatus = POStatus.STATUS_OK;
+        r.result = illustratorMarkup(null, res, 0);
+        detachInput();
+        return r;
+    }
+
+    public void detachInput() {
+        key = null;
+        bags = null;
+    }
+
+    protected Tuple illustratorMarkup2(Object in, Object out) {
+        if (illustrator != null) {
+            ExampleTuple tOut;
+            if (!(out instanceof ExampleTuple)) {
+                tOut = new ExampleTuple((Tuple) out);
+            } else {
+                tOut = (ExampleTuple) out;
+            }
+            illustrator.getLineage().insert(tOut);
+            tOut.synthetic = ((ExampleTuple) in).synthetic;
+            illustrator.getLineage().union(tOut, (Tuple) in);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
+
+    protected Tuple starMarkup(Tuple key, Tuple val, Tuple out){
+        if (illustrator != null){
+            Tuple copy = illustratorMarkup2(key, out);
+            // For distinct, we also need to retain lineage information from the values.
+            if (isDistinct())
+                copy = illustratorMarkup2(val, out);
+            return copy;
+        } else
+            return (Tuple) out;
+    }
+
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        Object key = getKey(keyWritable);
+        // Need to make a copy of the value, as hadoop uses the same ntup
+        // to represent each value.
+        Tuple val = (Tuple) ntup.getValueAsPigType();
+
+        Tuple copy = null;
+        // The "value (val)" that we just got may not
+        // be the complete "value". It may have some portions
+        // in the "key" (look in POLocalRearrange for more comments)
+        // If this is the case we need to stitch
+        // the "value" together.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = keyInfo.get(index);
+        boolean isProjectStar = lrKeyInfo.first;
+        Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+        int keyLookupSize = keyLookup.size();
+
+        if (keyLookupSize > 0) {
+
+            // we have some fields of the "value" in the
+            // "key".
+            int finalValueSize = keyLookupSize + val.size();
+            copy = mTupleFactory.newTuple(finalValueSize);
+            int valIndex = 0; // an index for accessing elements from
+            // the value (val) that we have currently
+            for (int i = 0; i < finalValueSize; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if (keyIndex == null) {
+                    // the field for this index is not in the
+                    // key - so just take it from the "value"
+                    // we were handed
+                    copy.set(i, val.get(valIndex));
+                    valIndex++;
+                } else {
+                    // the field for this index is in the key
+                    if (isKeyTuple && isKeyCompound) {
+                        // the key is a tuple, extract the
+                        // field out of the tuple
+                        copy.set(i, ((Tuple) key).get(keyIndex));
+                    } else {
+                        copy.set(i, key);
+                    }
+                }
+            }
+            copy = illustratorMarkup2(val, copy);
+        } else if (isProjectStar) {
+
+            // the whole "value" is present in the "key"
+            copy = mTupleFactory.newTuple(((Tuple) key).getAll());
+            copy = starMarkup((Tuple) key, val, copy);
+        } else {
+
+            // there is no field of the "value" in the
+            // "key" - so just make a copy of what we got
+            // as the "value"
+            copy = mTupleFactory.newTuple(val.getAll());
+            copy = illustratorMarkup2(val, copy);
+        }
+        return copy;
+    }
+
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    public void setKeyType(byte keyType) {
+        this.keyType = keyType;
+    }
+
+    /**
+     * @return the isKeyTuple
+     */
+    public boolean getKeyTuple() {
+        return isKeyTuple;
+    }
+
+    /**
+     * @return the keyAsTuple
+     */
+    public Tuple getKeyAsTuple() {
+        return isKeyTuple ? (Tuple) key : null;
+    }
+
+    /**
+     * @return the key
+     */
+    public Object getKey() {
+        return key;
+    }
+
+    public boolean[] getInner() {
+        return inner;
+    }
+
+    public void setInner(boolean[] inner) {
+        this.inner = inner;
+    }
+
+    /**
+     * @param keyInfo
+     *            the keyInfo to set
+     */
+    public void setKeyInfo(
+            Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+    }
+
+    /**
+     * @param keyTuple
+     *            the keyTuple to set
+     */
+    public void setKeyTuple(boolean keyTuple) {
+        this.isKeyTuple = keyTuple;
+    }
+
+    /**
+     * @param keyCompound
+     *            the keyCompound to set
+     */
+    public void setKeyCompound(boolean keyCompound) {
+        this.isKeyCompound = keyCompound;
+    }
+
+    /**
+     * @return the keyInfo
+     */
+    public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+        return keyInfo;
+    }
+
+    public Illustrator getIllustrator() {
+        return illustrator;
+    }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        this.illustrator = illustrator;
+    }
+
+    /**
+     * @return the distinct
+     */
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    /**
+     * @param distinct
+     *            the distinct to set
+     */
+    public void setDistinct(boolean distinct) {
+        this.distinct = distinct;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public void setPackageType(PackageType type) {
+        this.pkgType = type;
+    }
+
+    public PackageType getPackageType() {
+        return pkgType;
+    }
+
+    public int getNumInputs(byte index) {
+        return numInputs;
+    }
+
+    public int getNumInputs() {
+        return numInputs;
+    }
+
+    public void setNumInputs(int numInputs) {
+        this.numInputs = numInputs;
+    }
+
+    @Override
+    public Packager clone() throws CloneNotSupportedException {
+        Packager clone = (Packager) super.clone();
+        clone.setNumInputs(numInputs);
+        clone.setPackageType(pkgType);
+        clone.setDistinct(distinct);
+        if (inner != null) {
+            clone.inner = new boolean[inner.length];
+            for (int i = 0; i < inner.length; i++) {
+                clone.inner[i] = inner[i];
+            }
+        } else
+            clone.inner = null;
+        if (keyInfo != null)
+            clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
+                    keyInfo));
+        clone.setKeyCompound(isKeyCompound);
+        clone.setKeyTuple(isKeyTuple);
+        clone.setKeyType(keyType);
+        clone.setUseSecondaryKey(useSecondaryKey);
+        return clone;
+    }
+
+    public String name() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // All customized packagers are introduced during MRCompilaition.
+        // Illustrate happens before that, so we only have to focus on the basic
+        // POPackage
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            boolean synthetic = false;
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+            }
+
+            if (isDistinct()) {
+                int count = 0;
+                for (Tuple tmp : bags[0]){
+                    count++;
+                    if (!tmp.equals(tOut))
+                        lineageTracer.union(tOut, tmp);
+                }
+                if (count > 1) // only non-distinct tuples are inserted into the
+                    // equivalence class
+                    illustrator.getEquivalenceClasses().get(eqClassIndex)
+                    .add(tOut);
+                illustrator.addData((Tuple) tOut);
+                return (Tuple) tOut;
+            }
+            boolean outInEqClass = true;
+            try {
+                for (int i = 1; i < numInputs + 1; i++) {
+                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
+                    Iterator<Tuple> iter = dbs.iterator();
+                    if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2
+                        // records
+                        outInEqClass = false;
+                    while (iter.hasNext()) {
+                        Tuple tmp = iter.next();
+                        // any of synthetic data in bags causes the output tuple
+                        // to be synthetic
+                        if (!synthetic && ((ExampleTuple) tmp).synthetic)
+                            synthetic = true;
+                        lineageTracer.union(tOut, tmp);
+                    }
+                }
+            } catch (ExecException e) {
+                // TODO better exception handling
+                throw new RuntimeException("Illustrator exception :"
+                        + e.getMessage());
+            }
+            if (outInEqClass)
+                illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = synthetic;
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
+
+    public void setParent(POPackage pack) {
+        parent = pack;
+    }
+
+    public int numberOfEquivalenceClasses() {
+        return 1;
+    }
+
+    public void checkBagType() {
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+    }
+
+    public DataBag getBag(){
+        return useDefaultBag ? mBagFactory.newDefaultBag()
+                // In a very rare case if there is a POStream after this
+                // POJoinPackage in the pipeline and is also blocking the pipeline;
+                // constructor argument should be 2 * numInputs. But for one obscure
+                // case we don't want to pay the penalty all the time.
+                : new InternalCachedBag(numInputs-1);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Sun Apr  6 10:54:13 2014
@@ -51,7 +51,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
@@ -59,13 +58,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -209,20 +206,6 @@ public class PlanHelper {
         }
 
         @Override
-        public void visitCombinerPackage(POCombinerPackage pkg)
-                throws VisitorException {
-            super.visitCombinerPackage(pkg);
-            visit(pkg);
-        }
-
-        @Override
-        public void visitMultiQueryPackage(POMultiQueryPackage pkg)
-                throws VisitorException {
-            super.visitMultiQueryPackage(pkg);
-            visit(pkg);
-        }
-
-        @Override
         public void visitPOForEach(POForEach nfe) throws VisitorException {
             super.visitPOForEach(nfe);
             visit(nfe);
@@ -400,13 +383,6 @@ public class PlanHelper {
         }
 
         @Override
-        public void visitJoinPackage(POJoinPackage joinPackage)
-                throws VisitorException {
-            super.visitJoinPackage(joinPackage);
-            visit(joinPackage);
-        }
-
-        @Override
         public void visitCast(POCast cast) {
             super.visitCast(cast);
             visit(cast);
@@ -449,7 +425,6 @@ public class PlanHelper {
             visit(stream);
         }
 
-        @Override
         public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
             super.visitSkewedJoin(sk);
             visit(sk);

Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Sun Apr  6 10:54:13 2014
@@ -22,30 +22,28 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
+
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
- * This bag is specifically created for use by POPackageLite. So it has three 
- * properties, the NullableTuple iterator, the key (Object) and the keyInfo 
- * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three 
- * of which are required in the constructor call. This bag does not store 
- * the tuples in memory, but has access to an iterator typically provided by 
- * Hadoop. Use this when you already have an iterator over tuples and do not 
- * want to copy over again to a new bag.
+ * This bag does not store the tuples in memory, but has access to an iterator
+ * typically provided by Hadoop. Use this when you already have an iterator over
+ * tuples and do not want to copy over again to a new bag.
  */
 public class ReadOnceBag implements DataBag {
 
-    // The Package operator that created this
-    POPackageLite pkg;
+    // The Packager that created this
+    protected Packager pkgr;
     
     //The iterator of Tuples. Marked transient because we will never serialize this.
-    transient Iterator<NullableTuple> tupIter;
+    protected transient Iterator<NullableTuple> tupIter;
     
     // The key being worked on
-    Object key;
+    protected PigNullableWritable keyWritable;
 
     /**
      * 
@@ -60,10 +58,11 @@ public class ReadOnceBag implements Data
      * @param tupIter Iterator<NullableTuple>
      * @param key Object
      */
-    public ReadOnceBag(POPackageLite pkg, Iterator<NullableTuple> tupIter, Object key) {
-        this.pkg = pkg;
+    public ReadOnceBag(Packager pkgr, Iterator<NullableTuple> tupIter,
+            PigNullableWritable keyWritable) {
+        this.pkgr = pkgr;
         this.tupIter = tupIter;
-        this.key = key;
+        this.keyWritable = keyWritable;
     }
 
     /* (non-Javadoc)
@@ -177,27 +176,23 @@ public class ReadOnceBag implements Data
 
     @Override
     public boolean equals(Object other) {
-        if(other instanceof ReadOnceBag)
-        {
-            if(pkg.getKeyTuple())
-            {
-                if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKeyTuple() == ((ReadOnceBag)other).pkg.getKeyTuple() && pkg.getKeyAsTuple().equals(((ReadOnceBag)other).pkg.getKeyAsTuple()))
-                {
+        if (other instanceof ReadOnceBag) {
+            if (pkgr.getKeyTuple()) {
+                if (tupIter == ((ReadOnceBag) other).tupIter
+                        && pkgr.getKeyTuple() == ((ReadOnceBag) other).pkgr
+                                .getKeyTuple()
+                        && pkgr.getKeyAsTuple().equals(
+                                ((ReadOnceBag) other).pkgr.getKeyAsTuple())) {
                     return true;
-                }
-                else
-                {
+                } else {
                     return false;
                 }
-            }
-            else
-            {
-                if(tupIter == ((ReadOnceBag)other).tupIter && pkg.getKey().equals(((ReadOnceBag)other).pkg.getKey()))
-                {
+            } else {
+                if (tupIter == ((ReadOnceBag) other).tupIter
+                        && pkgr.getKey().equals(
+                                ((ReadOnceBag) other).pkgr.getKey())) {
                     return true;
-                }
-                else
-                {
+                } else {
                     return false;
                 }
             }
@@ -208,18 +203,18 @@ public class ReadOnceBag implements Data
     @Override
     public int hashCode() {
     	int hash = 7;
-        if(pkg.getKeyTuple())
+        if (pkgr.getKeyTuple())
         {
-            hash = hash*31 + pkg.getKeyAsTuple().hashCode();
+            hash = hash * 31 + pkgr.getKeyAsTuple().hashCode();
         }
         else
         {
-        	hash = hash*31 + pkg.getKey().hashCode();
+            hash = hash * 31 + pkgr.getKey().hashCode();
         }
         return hash;
     }
 
-    class ReadOnceBagIterator implements Iterator<Tuple>
+    protected class ReadOnceBagIterator implements Iterator<Tuple>
     {
         /* (non-Javadoc)
          * @see java.util.Iterator#hasNext()
@@ -238,7 +233,7 @@ public class ReadOnceBag implements Data
             int index = ntup.getIndex();
             Tuple ret = null;
             try {
-                ret = pkg.getValueTuple(ntup, index, key);
+                ret = pkgr.getValueTuple(keyWritable, ntup, index);
             } catch (ExecException e)
             {
             	throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());

Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Sun Apr  6 10:54:13 2014
@@ -53,10 +53,10 @@ public abstract class SelfSpillBag exten
     public static class MemoryLimits {
 
         private long maxMemUsage;
-        private int cacheLimit = Integer.MAX_VALUE;
+        private long cacheLimit = Integer.MAX_VALUE;
         private long memUsage = 0;
         private long numObjsSizeChecked = 0;
-        
+
         private static float cachedMemUsage = 0.2F;
         private static long maxMem = 0;
         static {
@@ -99,11 +99,11 @@ public abstract class SelfSpillBag exten
          * 
          * @return number of objects limit
          */
-        public int getCacheLimit() {
+        public long getCacheLimit() {
             if (numObjsSizeChecked > 0) {
                 long avgUsage = memUsage / numObjsSizeChecked;
                 if (avgUsage > 0) {
-                    cacheLimit = (int) (maxMemUsage / avgUsage);
+                    cacheLimit = maxMemUsage / avgUsage;
                 }
             }
             return cacheLimit;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Sun Apr  6 10:54:13 2014
@@ -56,7 +56,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -64,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFrontend;
@@ -357,7 +357,7 @@ public class LogToPhyTranslationVisitor 
                     expressionPlans.put(i,loRank.getRankColPlans());
 
                 POPackage poPackage = compileToLR_GR_PackTrio(loRank, null, flags, expressionPlans);
-                poPackage.setPackageType(PackageType.GROUP);
+                poPackage.getPkgr().setPackageType(PackageType.GROUP);
                 translateSoftLinks(loRank);
 
                 List<Boolean> flattenLst = Arrays.asList(true, false);
@@ -366,7 +366,7 @@ public class LogToPhyTranslationVisitor 
                 POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
                 feproj1.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
                 feproj1.setColumn(0);
-                feproj1.setResultType(poPackage.getKeyType());
+                feproj1.setResultType(poPackage.getPkgr().getKeyType());
                 feproj1.setStar(false);
                 feproj1.setOverloaded(false);
                 fep1.add(feproj1);
@@ -667,14 +667,14 @@ public class LogToPhyTranslationVisitor 
                 throw new VisitorException(msg, errCode, PigException.BUG, e);
             }
 
-            poPackage.setKeyType(DataType.TUPLE);
+            poPackage.getPkgr().setKeyType(DataType.TUPLE);
             poPackage.setResultType(DataType.TUPLE);
             poPackage.setNumInps(count);
             boolean inner[] = new boolean[count];
             for (int i=0;i<count;i++) {
                 inner[i] = true;
             }
-            poPackage.setInner(inner);
+            poPackage.getPkgr().setInner(inner);
 
             List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
             List<Boolean> flattenLst = new ArrayList<Boolean>();
@@ -999,7 +999,7 @@ public class LogToPhyTranslationVisitor 
             break;
         case REGULAR:
             POPackage poPackage = compileToLR_GR_PackTrio(cg, cg.getCustomPartitioner(), cg.getInner(), cg.getExpressionPlans());
-            poPackage.setPackageType(PackageType.GROUP);            
+            poPackage.getPkgr().setPackageType(PackageType.GROUP);
             logToPhyMap.put(cg, poPackage);
             break;
         case MERGE:
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor 
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
-            poPackage.setPackageType(POPackage.PackageType.JOIN);
+            poPackage.getPkgr().setPackageType(PackageType.JOIN);
         }
         translateSoftLinks(loj);
     }
@@ -1485,10 +1485,10 @@ public class LogToPhyTranslationVisitor 
             }
         }
 
-        poPackage.setKeyType(type);
+        poPackage.getPkgr().setKeyType(type);
         poPackage.setResultType(DataType.TUPLE);
         poPackage.setNumInps(count);
-        poPackage.setInner(innerFlags);
+        poPackage.getPkgr().setInner(innerFlags);
         return poPackage;
     }