You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/12 21:06:26 UTC

svn commit: r1586885 [1/2] - in /pig/branches/tez: ./ ivy/ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/fetch/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionen...

Author: cheolsoo
Date: Sat Apr 12 19:06:25 2014
New Revision: 1586885

URL: http://svn.apache.org/r1586885
Log:
Merge latest trunk changes

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
      - copied unchanged from r1586680, pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
Modified:
    pig/branches/tez/   (props changed)
    pig/branches/tez/CHANGES.txt
    pig/branches/tez/KEYS
    pig/branches/tez/ivy/libraries.properties
    pig/branches/tez/src/org/apache/pig/PigServer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java
    pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
    pig/branches/tez/src/org/apache/pig/impl/PigContext.java
    pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/branches/tez/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
    pig/branches/tez/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
    pig/branches/tez/src/org/apache/pig/scripting/js/JsScriptEngine.java
    pig/branches/tez/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
    pig/branches/tez/src/org/apache/pig/tools/grunt/GruntParser.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/branches/tez/src/pig-default.properties   (props changed)
    pig/branches/tez/test/e2e/pig/tests/macro.conf
    pig/branches/tez/test/e2e/pig/tests/multiquery.conf
    pig/branches/tez/test/e2e/pig/tests/turing_jython.conf
    pig/branches/tez/test/org/apache/pig/test/TestExampleGenerator.java
    pig/branches/tez/test/org/apache/pig/test/TestJobSubmission.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
    pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)

Propchange: pig/branches/tez/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1582882-1586680

Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Sat Apr 12 19:06:25 2014
@@ -30,6 +30,10 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
+
+PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
+
 PIG-3449: Move JobCreationException to org.apache.pig.backend.hadoop.executionengine (cheolsoo)
 
 PIG-3765: Ability to disable Pig commands and operators (prkommireddi)
@@ -96,9 +100,19 @@ PIG-3117: A debug mode in which pig does
 PIG-3484: Make the size of pig.script property configurable (cheolsoo)
  
 OPTIMIZATIONS
+
+PIG-3882: Multiquery off mode execution is not done in batch and very inefficient (rohini)
  
 BUG FIXES
 
+PIG-3871: Replace org.python.google.* with com.google.* in imports (cheolsoo)
+
+PIG-3858: PigLogger/PigStatusReporter is not set for fetch tasks (lbendig via cheolsoo)
+
+PIG-3798: Registered jar in pig script are appended to the classpath multiple times (cheolsoo)
+
+PIG-3844: Make ScriptState InheritableThreadLocal for threads that need it (amatsukawa via cheolsoo)
+
 PIG-3837: ant pigperf target is broken in trunk (cheolsoo)
 
 PIG-3836: Pig signature has has guava version dependency (amatsukawa via cheolsoo)
@@ -162,8 +176,6 @@ PIG-3641: Split "otherwise" producing in
 
 PIG-3682: mvn-inst target does not install pig-h2.jar into local .m2 (raluri via aniket486)
 
-PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini)
-
 PIG-3511: Security: Pig temporary directories might have world readable permissions (rohini)
 
 PIG-3664: Piggy Bank XPath UDF can't be called (nezihyigitbasi via daijy)
@@ -263,6 +275,10 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini)
+
+PIG-3819: e2e tests containing "perl -e "print $_;" fails on Hadoop 2 (daijy)
+
 PIG-3813: Rank column is assigned different uids everytime when schema is reset (cheolsoo)
 
 PIG-3833: Relation loaded by AvroStorage with schema is projected incorrectly in foreach statement (jeongjinku via cheolsoo)

Modified: pig/branches/tez/KEYS
URL: http://svn.apache.org/viewvc/pig/branches/tez/KEYS?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/KEYS (original)
+++ pig/branches/tez/KEYS Sat Apr 12 19:06:25 2014
@@ -114,3 +114,35 @@ rllbZkbS2bye7GzOLvBNqk5Z2PuUGP9henu+Q807
 YSsRv5xz/aBvo9tkoZtzYeOqDe7Ut2nkgc+DD2d6YWnmFw==
 =moXg
 -----END PGP PUBLIC KEY BLOCK-----
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG/MacGPG2 v2.0.22 (Darwin)
+Comment: GPGTools - https://gpgtools.org
+
+mQENBFM/xdgBCAC5ZzlSfaTAfzuSL50HbTkQQpEdk/oRPUDIeIIHPKoj8iaXxydP
+zdafXuFMnnJceJDbczfzsu7+owXQWV/WM/zQ8ZxXsdS2LSXPMhNkYhgJKk6VHoQc
+ssvlXr5C7kQrazeTt7IGn6eWJknGNMRivt5xpzLZ/LJfeqyiID7pl2VALxS6argN
+6zeVQJG5hk1+z2SiUrVvHDIHdH9NEZAMCclrQvXEqoTV3vO+56TRyzMVfU+zrkSM
+Pyuoii1ATjqjQT3HQyh31fIRUDD5fXDyeLbekBWWbvevtyiY4oBuk5dPgJ+3O5OV
+AWGbBVmTj+DZc8myI8dIg3saZ0D6ehzut8WPABEBAAG0InBya29tbWlyZWRkaSA8
+cHJhc2gxNzg0QGdtYWlsLmNvbT6JATcEEwEKACEFAlM/xdgCGwMFCwkIBwMFFQoJ
+CAsFFgIDAQACHgECF4AACgkQB2xWLNUxu+pENAgAl32vD24/omZa5tN+fxGKh0AD
+fYt8l+RNlhoa0ghJQWVta8siuak0mW90prnzUc1zKM4NTb1I9L8K2QfIq97DhnbU
+Cq0SRKn+hKsN/qW9dydTop8ooqzYpgF6aWx+mx6aooJhk3ZayLA1DaNIJ6frInhy
+l1j6VjDFvTXyvyf9JuhDiCMDzeFtcOJ12Yp2KDhbNKugAgMFbjmUicoRG7T4ImJ2
+N4ndKuYKOV8BLoSNjezohaHs+6PtLvYOsGMHGmu72fCCMsHg33FLHNjMS9+/2Vh9
+7QLv2/jImcJvKkw4JPd6I5LZD7AkOA4E3XE7J/cd2FmZXYQzmrERohcarAj4hLkB
+DQRTP8XYAQgAulCh4cEbmzFQTwjGjCh6r/hYw6ZAcltQMPbUxLatcysq3Y718YYV
+/MBgR+Dlqu41lR30S1QuRAH4tPVraFvuBojm7BOReXe5bQYuDShtyXdkRRxXKlA7
+FmdD783FAI2XMNlywO+Pq/4wJ/Cbb5HALwAuUF+oOTUnhkRoAS8Q665FxD4AwxgE
+WHW1UT/tyv0rEo9X4XGuufdXeBZoNpqdt16BsW/rmFrXesv//+lp9X8iLQMccd3l
+Jnv6v5e0WTBFbBChy5W0vEYFlZ5N6nyNNgmindTa+kKcdrkv3YP74KRBIG8TXBC9
+HUNXDGMqMAlmLGJtzp0yNupvkRp0t8pHoQARAQABiQEfBBgBCgAJBQJTP8XYAhsM
+AAoJEAdsVizVMbvqSWsH/isN8m8fRJFrVCJAlDb54ABeFa6xwGodddosKmrTi83s
+CJ5uJl+G3d2CK4kTePyj9VGoRw5BA+3anotGQXhcQzfjPcraepWkj6RbWV1tnEQm
+IzXtIIh/fsswEroIwZlrJ3/qI6u+DrZYF5GF9SfA+qTRjlH/8HOzdpAU2ZlBIzT/
+guy3kvmcHWXO7JHSjDNz4XGTbkLYkh20le1XTlMdDy1OSIIGRVWBgyWHICZHXNnk
+I/0TpEVFgcGGTT2CSqLAFUlFdL6CzpwpGXhUxzSmdW5ULIs4dm2GulCKwMArfQE+
+URdA1jknkPjIIC1wBetzMrIbfcsoZS4SLbZm9u8C+2k=
+=u7gH
+-----END PGP PUBLIC KEY BLOCK-----

Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Sat Apr 12 19:06:25 2014
@@ -91,5 +91,5 @@ mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
 snappy.version=1.1.0.1
-tez.version=0.4.0-incubating
+tez.version=0.5.0-incubating-SNAPSHOT
 parquet-pig-bundle.version=1.2.3

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Sat Apr 12 19:06:25 2014
@@ -99,12 +99,12 @@ import org.apache.pig.parser.QueryParser
 import org.apache.pig.pen.ExampleGenerator;
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.validator.BlackAndWhitelistFilter;
 import org.apache.pig.validator.BlackAndWhitelistValidator;
 import org.apache.pig.validator.PigCommandFilter;
@@ -154,8 +154,6 @@ public class PigServer {
 
     protected final String scope = constructScope();
 
-
-    private boolean isMultiQuery = true;
     private boolean aggregateWarning = true;
 
     private boolean validateEachStatement = false;
@@ -235,8 +233,6 @@ public class PigServer {
         currDAG = new Graph(false);
 
         aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-        isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties()
-                .getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
 
         jobName = pigContext.getProperties().getProperty(
                 PigContext.JOB_NAME,
@@ -347,7 +343,7 @@ public class PigServer {
         if (currDAG != null) {
             graphs.push(currDAG);
         }
-        currDAG = new Graph(isMultiQuery);
+        currDAG = new Graph(true);
     }
 
     /**
@@ -420,13 +416,7 @@ public class PigServer {
             parseAndBuild();
         }
 
-        PigStats stats = null;
-        if( !isMultiQuery ) {
-            // ignore if multiquery is off
-            stats = PigStats.get();
-        } else {
-            stats = execute();
-        }
+        PigStats stats = execute();
 
         return getJobs(stats);
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Sat Apr 12 19:06:25 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -40,8 +41,9 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.EmptyPigStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.joda.time.DateTimeZone;
 
 /**
@@ -128,6 +130,13 @@ public class FetchLauncher {
             // ensure that the internal timezone is uniformly in UTC offset style
             DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
         }
+        
+        boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+        pigHadoopLogger.setAggregate(aggregateWarning);
+        PigStatusReporter.getInstance().setFetchContext(new FetchContext());
+        pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+        PhysicalOperator.setPigLogger(pigHadoopLogger);
     }
 
     private void runPipeline(POStore posStore) throws IOException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sat Apr 12 19:06:25 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Sat Apr 12 19:06:25 2014
@@ -1610,7 +1610,7 @@ public class JobControlCompiler{
         Path pathInHDFS = shipToHDFS(pigContext, conf, url);
         // and add to the DistributedCache
         DistributedCache.addFileToClassPath(pathInHDFS, conf);
-        pigContext.skipJars.add(url.getPath());
+        pigContext.addSkipJar(url.getPath());
     }
 
     private static Path getCacheStagingDir(Configuration conf) throws IOException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Sat Apr 12 19:06:25 2014
@@ -2749,16 +2749,27 @@ public class MRCompiler extends PhyPlanV
             JoinPackager pkgr = new JoinPackager(pack.getPkgr(), forEach);
             pkgr.setChunkSize(Long.parseLong(chunkSize));
             pack.setPkgr(pkgr);
+            List<PhysicalOperator> succs = plan.getSuccessors(forEach);
+            if (succs != null) {
+                if (succs.size() != 1) {
+                    int errCode = 2028;
+                    String msg = "ForEach can only have one successor. Found "
+                            + succs.size() + " successors.";
+                    throw new MRCompilerException(msg, errCode,
+                            PigException.BUG);
+                }
+            }
             plan.remove(pack);
             try {
                 plan.replace(forEach, pack);
             } catch (PlanException e) {
                 int errCode = 2029;
-                String msg = "Error rewriting POJoinPackage.";
+                String msg = "Error rewriting join package.";
                 throw new MRCompilerException(msg, errCode, PigException.BUG, e);
             }
-
-            LogFactory.getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
+            mr.phyToMRMap.put(forEach, pack);
+            LogFactory.getLog(LastInputStreamingOptimizer.class).info(
+                    "Rewrite: POPackage->POForEach to POPackage(JoinPackager)");
         }
 
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Sat Apr 12 19:06:25 2014
@@ -691,7 +691,7 @@ class MultiQueryOptimizer extends MROpPl
             List<Packager> pkgs = ((MultiQueryPackager) fromPkgr)
                     .getPackagers();
             for (Packager p : pkgs) {
-                ((MultiQueryPackager) fromPkgr).addPackager(p);
+                ((MultiQueryPackager) toPkgr).addPackager(p);
                 pkCount++;
             }
             toPkgr.addIsKeyWrappedList(((MultiQueryPackager) fromPkgr)
@@ -733,7 +733,7 @@ class MultiQueryOptimizer extends MROpPl
         }
 
         if (toPkgr.isSameMapKeyType()) {
-            toPkgr.setKeyType(pk.getPkgr().getKeyType());
+            toPkgr.setKeyType(fromPkgr.getKeyType());
         } else {
             toPkgr.setKeyType(DataType.TUPLE);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Sat Apr 12 19:06:25 2014
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
@@ -47,6 +48,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -191,6 +193,15 @@ public class PlanPrinter<O extends Opera
           else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
           }
+          else if(node instanceof POPackage){
+            Packager pkgr = ((POPackage) node).getPkgr();
+            if(pkgr instanceof MultiQueryPackager){
+              List<Packager> pkgrs = ((MultiQueryPackager) pkgr).getPackagers();
+              for (Packager child : pkgrs){
+                  sb.append(LSep + child.name() + "\n");
+              }
+            }
+          }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Sat Apr 12 19:06:25 2014
@@ -33,6 +33,7 @@ import javax.xml.transform.stream.Stream
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -40,16 +41,19 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.Node;
 
 
 public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
@@ -175,6 +179,11 @@ public class XMLPhysicalPlanPrinter<P ex
             subPlans = ((POSplit)node).getPlans();
         } else if (node instanceof PODemux) {
             subPlans = ((PODemux)node).getPlans();
+        } else if(node instanceof POPackage){
+            childNode = createPONode(node);
+            Packager pkgr = ((POPackage) node).getPkgr();
+            Node pkgrNode = createPackagerNode(pkgr);
+            childNode.appendChild(pkgrNode);
         } else if(node instanceof POFRJoin){
             childNode = createPONode(node);
             POFRJoin frj = (POFRJoin)node;
@@ -215,4 +224,16 @@ public class XMLPhysicalPlanPrinter<P ex
             depthFirst(pred, childNode);
         }
     }
+
+    private Node createPackagerNode(Packager pkgr) {
+        Element pkgrNode = doc.createElement(pkgr.getClass().getSimpleName());
+        if (pkgr instanceof MultiQueryPackager) {
+            List<Packager> pkgrs = ((MultiQueryPackager) pkgr)
+                    .getPackagers();
+            for (Packager child : pkgrs) {
+                pkgrNode.appendChild(createPackagerNode(child));
+            }
+        }
+        return pkgrNode;
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Sat Apr 12 19:06:25 2014
@@ -29,6 +29,7 @@ import org.apache.pig.data.InternalCache
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.Pair;
 /**
  * The package operator that packages the globally rearranged tuples into
@@ -55,12 +56,12 @@ public class CombinerPackager extends Pa
      * @param bags for each field, indicates whether it should be a bag (true)
      * or a simple field (false).
      */
-    public CombinerPackager(Packager pkgr, boolean[] bags) {
+    public CombinerPackager(Packager pkg, boolean[] bags) {
         super();
-        keyType = pkgr.keyType;
+        keyType = pkg.keyType;
         numInputs = 1;
-        inner = new boolean[pkgr.inner.length];
-        for (int i = 0; i < pkgr.inner.length; i++) {
+        inner = new boolean[pkg.inner.length];
+        for (int i = 0; i < pkg.inner.length; i++) {
             inner[i] = true;
         }
         if (bags != null) {
@@ -75,6 +76,7 @@ public class CombinerPackager extends Pa
     /**
      * @param keyInfo the keyInfo to set
      */
+    @Override
     public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
         this.keyInfo = keyInfo;
         // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
@@ -84,15 +86,15 @@ public class CombinerPackager extends Pa
         // has an index of 0. When we do support combiner in Cogroups
         // THIS WILL NEED TO BE REVISITED.
         Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-            keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+                keyInfo.get(0); // assumption: only group are "combinable", hence index 0
         keyLookup = lrKeyInfo.second;
     }
 
     private DataBag createDataBag(int numBags) {
         String bagType = null;
         if (PigMapReduce.sJobConfInternal.get() != null) {
-               bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
-           }
+            bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+        }
 
         if (bagType != null && bagType.equalsIgnoreCase("default")) {
             return new NonSpillableDataBag();
@@ -137,10 +139,10 @@ public class CombinerPackager extends Pa
 
         detachInput();
 
-        // The successor of the POCombinerPackage as of
+        // The successor of the POPackage(Combiner) as of
         // now SHOULD be a POForeach which has been adjusted
         // to look for its inputs by projecting from the corresponding
-        // positions in the POCombinerPackage output.
+        // positions in the POPackage(Combiner) output.
         // So we will NOT be adding the key in the result here but merely
         // putting all bags into a result tuple and returning it.
         Tuple res;
@@ -150,11 +152,12 @@ public class CombinerPackager extends Pa
         r.result = res;
         r.returnStatus = POStatus.STATUS_OK;
         return r;
+
     }
 
     @Override
-    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
-            throws ExecException {
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
         return (Tuple) ntup.getValueAsPigType();
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Sat Apr 12 19:06:25 2014
@@ -21,28 +21,23 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.Illustrator;
 
 public class JoinPackager extends Packager {
 
     private static final long serialVersionUID = 1L;
-
     private POOptimizedForEach forEach;
     private boolean newKey = true;
     private Tuple res = null;
     private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
-    private boolean firstTime = true;
-    private boolean useDefaultBag = false;
 
     public static final String DEFAULT_CHUNK_SIZE = "1000";
 
@@ -80,9 +75,9 @@ public class JoinPackager extends Packag
      * Calls getNext to get next ForEach result. The input for POJoinPackage is
      * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
      * one tuple a time to the delegated ForEach operator, the input for ForEach is
-     *
+     * 
      *     (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
-     *
+     * 
      * of k tuples.
      * For every ForEach input, pull all the results from ForEach.
      * getNext will be called multiple times for a particular input,
@@ -91,17 +86,6 @@ public class JoinPackager extends Packag
      */
     @Override
     public Result getNext() throws ExecException {
-
-        if(firstTime){
-            firstTime = false;
-            if (PigMapReduce.sJobConfInternal.get() != null) {
-                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
-                if (bagType != null && bagType.equalsIgnoreCase("default")) {
-                    useDefaultBag = true;
-                }
-            }
-        }
-
         Tuple it = null;
 
         // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
@@ -113,20 +97,7 @@ public class JoinPackager extends Packag
             // Put n-1 inputs into bags
             dbs = new DataBag[numInputs];
             for (int i = 0; i < numInputs - 1; i++) {
-                if (!readOnce[i]) {
-                    dbs[i] = bags[i];
-                } else {
-                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
-                            .newDefaultBag()
-                    // In a very rare case if there is a POStream after this
-                    // POJoinPackage in the pipeline and is also blocking the
-                    // pipeline;
-                    // constructor argument should be 2 * numInputs. But for one
-                    // obscure
-                    // case we don't want to pay the penalty all the time.
-                            : new InternalCachedBag(numInputs - 1);
-                    dbs[i].addAll(bags[i]);
-                }
+                dbs[i] = bags[i];
             }
 
             // For last bag, we always use NonSpillableBag.
@@ -218,9 +189,25 @@ public class JoinPackager extends Packag
     @Override
     public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
             throws ExecException {
-        super.attachInput(key, bags, readOnce);
+        checkBagType();
+
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // JoinPackager expects all but the last bag to be materialized
+        for (int i = 0; i < bags.length - 1; i++) {
+            if (readOnce[i]) {
+                DataBag materializedBag = getBag();
+                materializedBag.addAll(bags[i]);
+                bags[i] = materializedBag;
+            }
+        }
+        if (readOnce[numInputs - 1] != true) {
+            throw new ExecException(
+                    "JoinPackager expects the last input to be streamed");
+        }
         this.newKey = true;
-    };
+    }
 
     public List<PhysicalPlan> getInputPlans() {
         return forEach.getInputPlans();
@@ -247,4 +234,15 @@ public class JoinPackager extends Packag
     public void setChunkSize(long chunkSize) {
         this.chunkSize = chunkSize;
     }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        this.illustrator = illustrator;
+        forEach.setIllustrator(illustrator);
+    }
+
+    @Override
+    public String name() {
+        return this.getClass().getSimpleName() + "(" + forEach.getFlatStr() + ")";
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Sat Apr 12 19:06:25 2014
@@ -17,20 +17,24 @@
  */
 
 /**
- *
+ * 
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This package operator is a specialization
@@ -41,11 +45,14 @@ import org.apache.pig.impl.util.Pair;
 public class LitePackager extends Packager {
 
     private static final long serialVersionUID = 1L;
+    private PigNullableWritable keyWritable;
 
+    @Override
     public boolean[] getInner() {
         return null;
     }
 
+    @Override
     public void setInner(boolean[] inner) {
     }
 
@@ -58,12 +65,7 @@ public class LitePackager extends Packag
         LitePackager clone = (LitePackager) super.clone();
         clone.inner = null;
         if (keyInfo != null) {
-            clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-
-            for (Entry<Integer, Pair<Boolean, Map<Integer, Integer>>> entry : keyInfo
-                    .entrySet()) {
-                clone.keyInfo.put(entry.getKey(), entry.getValue());
-            }
+            clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(keyInfo);
         }
         return clone;
     }
@@ -84,27 +86,6 @@ public class LitePackager extends Packag
     }
 
     /**
-     * @return the isKeyTuple
-     */
-    public boolean getKeyTuple() {
-        return isKeyTuple;
-    }
-
-    /**
-     * @return the keyAsTuple
-     */
-    public Tuple getKeyAsTuple() {
-        return isKeyTuple ? (Tuple) key : null;
-    }
-
-    /**
-     * @return the key
-     */
-    public Object getKey() {
-        return key;
-    }
-
-    /**
      * Similar to POPackage.getNext except that
      * only one input is expected with index 0
      * and ReadOnceBag is used instead of
@@ -127,27 +108,48 @@ public class LitePackager extends Packag
         detachInput();
         Result r = new Result();
         r.returnStatus = POStatus.STATUS_OK;
-        r.result = res;
+        r.result = illustratorMarkup(null, res, 0);
         return r;
     }
 
     /**
-     * Makes use of the superclass method, but this requires
-     * an additional parameter key passed by ReadOnceBag.
-     * key of this instance will be set to null in detachInput
-     * call, but an instance of ReadOnceBag may have the original
-     * key that it uses. Therefore this extra argument is taken
-     * to temporarily set it before the call to the superclass method
-     * and then restore it.
+     * Makes use of the superclass method, but this requires an additional
+     * parameter key passed by ReadOnceBag. key of this instance will be set to
+     * null in detachInput call, but an instance of ReadOnceBag may have the
+     * original key that it uses. Therefore this extra argument is taken to
+     * temporarily set it before the call to the superclass method and then
+     * restore it.
      */
     @Override
-    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
-            throws ExecException {
-        Object origKey = this.key;
-        this.key = key;
-        Tuple retTuple = super.getValueTuple(key, ntup, index);
-        this.key = origKey;
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        PigNullableWritable origKey = this.keyWritable;
+        this.keyWritable = keyWritable;
+        Tuple retTuple = super.getValueTuple(keyWritable, ntup, index);
+        this.keyWritable = origKey;
         return retTuple;
     }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+            }
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = false; // not expect this to be really used
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Sat Apr 12 19:06:25 2014
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.NullableUnknownWritable;
 import org.apache.pig.impl.io.PigNullableWritable;
 
@@ -84,12 +85,12 @@ public class MultiQueryPackager extends 
      */
     private boolean inCombiner = false;
 
-    transient private PigNullableWritable myKey;
+    private PigNullableWritable keyWritable = null;
 
     /**
      * Appends the specified package object to the end of
      * the package list.
-     *
+     * 
      * @param pack package to be appended to the list
      */
     public void addPackager(Packager pkgr) {
@@ -99,7 +100,7 @@ public class MultiQueryPackager extends 
     /**
      * Appends the specified package object to the end of
      * the package list.
-     *
+     * 
      * @param pack package to be appended to the list
      * @param mapKeyType the map key type associated with the package
      */
@@ -113,7 +114,7 @@ public class MultiQueryPackager extends 
 
     /**
      * Returns the list of packages.
-     *
+     * 
      * @return the list of the packages
      */
     public List<Packager> getPackagers() {
@@ -133,7 +134,7 @@ public class MultiQueryPackager extends 
             return new Result(POStatus.STATUS_EOP, null);
         }
 
-        byte origIndex = myKey.getIndex();
+        byte origIndex = keyWritable.getIndex();
 
         int index = (int)origIndex;
         index &= idxPart;
@@ -151,9 +152,9 @@ public class MultiQueryPackager extends 
         // check to see if we need to unwrap the key. The keys may be
         // wrapped inside a tuple by LocalRearrange operator when jobs
         // with different map key types are merged
-        PigNullableWritable curKey = myKey;
+        PigNullableWritable curKey = keyWritable;
         if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
-            Tuple tup = (Tuple)myKey.getValueAsPigType();
+            Tuple tup = (Tuple) keyWritable.getValueAsPigType();
             curKey = HDataType.getWritableComparableTypes(tup.get(0),
                     pkgr.getKeyType());
             curKey.setIndex(origIndex);
@@ -193,14 +194,14 @@ public class MultiQueryPackager extends 
             myObj.setIndex(origIndex);
             tuple.set(0, myObj);
         }
-        // illustrator markup has been handled by "pack"
+        // illustrator markup has been handled by "pkgr"
         return res;
     }
 
     /**
      * Returns the list of booleans that indicates if the
      * key needs to unwrapped for the corresponding plan.
-     *
+     * 
      * @return the list of isKeyWrapped boolean values
      */
     public List<Boolean> getIsKeyWrappedList() {
@@ -209,7 +210,7 @@ public class MultiQueryPackager extends 
 
     /**
      * Adds a list of IsKeyWrapped boolean values
-     *
+     * 
      * @param lst the list of boolean values to add
      */
     public void addIsKeyWrappedList(List<Boolean> lst) {
@@ -234,4 +235,16 @@ public class MultiQueryPackager extends 
         return sameMapKeyType;
     }
 
+    @Override
+    public int getNumInputs(byte index) {
+        return packagers.get(((int) index) & idxPart).getNumInputs(index);
+    }
+
+    @Override
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        this.keyWritable = keyWritable;
+        return packagers.get(((int) index) & idxPart).getValueTuple(
+                keyWritable, ntup, index);
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sat Apr 12 19:06:25 2014
@@ -19,11 +19,8 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -35,6 +32,7 @@ import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
@@ -42,10 +40,7 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
 
 /**
  * The package operator that packages
@@ -77,14 +72,6 @@ public class POPackage extends PhysicalO
     //key, no value.
     protected int numInputs;
 
-    // A mapping of input index to key information got from LORearrange
-    // for that index. The Key information is a pair of boolean, Map.
-    // The boolean indicates whether there is a lone project(*) in the
-    // cogroup by. If not, the Map has a mapping of column numbers in the
-    // "value" to column numbers in the "key" which contain the fields in
-    // the "value"
-    protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
-
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -94,7 +81,7 @@ public class POPackage extends PhysicalO
 
     protected Packager pkgr;
 
-    private boolean[] readOnce;
+    protected PigNullableWritable keyWritable;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -115,16 +102,21 @@ public class POPackage extends PhysicalO
     public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
             Packager pkgr) {
         super(k, rp, inp);
-        this.numInputs = -1;
-        this.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+        numInputs = -1;
         this.pkgr = pkgr;
     }
 
     public POPackage(POPackage copy) {
         super(copy);
         this.numInputs = copy.numInputs;
-        this.keyInfo = copy.keyInfo;
         this.pkgr = copy.pkgr;
+        this.pkgr.keyInfo = copy.pkgr.keyInfo;
+    }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        super.setIllustrator(illustrator);
+        pkgr.setIllustrator(illustrator);
     }
 
     @Override
@@ -160,6 +152,7 @@ public class POPackage extends PhysicalO
         try {
             tupIter = inp;
             key = pkgr.getKey(k);
+            keyWritable = k;
             inputAttached = true;
         } catch (Exception e) {
             throw new RuntimeException(
@@ -171,6 +164,7 @@ public class POPackage extends PhysicalO
     /**
      * attachInput's better half!
      */
+    @Override
     public void detachInput() {
         tupIter = null;
         key = null;
@@ -184,15 +178,12 @@ public class POPackage extends PhysicalO
     public void setNumInps(int numInps) {
         this.numInputs = numInps;
         pkgr.setNumInputs(numInps);
-        readOnce = new boolean[numInputs];
-        for (int i = 0; i < numInputs; i++)
-            readOnce[i] = false;
     }
 
     /**
-     * From the inputs, constructs the output tuple
-     * for this co-group in the required format which
-     * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+     * From the inputs, constructs the output tuple for this co-group in the
+     * required format which is (key, {bag of tuples from input 1}, {bag of
+     * tuples from input 2}, ...)
      */
     @Override
     public Result getNextTuple() throws ExecException {
@@ -206,12 +197,18 @@ public class POPackage extends PhysicalO
                 }
             }
         }
+        int numInputs = pkgr.getNumInputs(keyWritable.getIndex());
+        boolean[] readOnce = new boolean[numInputs];
+        for (int i = 0; i < numInputs; i++)
+            readOnce[i] = false;
+
         if (isInputAttached()) {
             // Create numInputs bags
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
 
             if (isAccumulative()) {
+                readOnce[numInputs - 1] = false;
                 // create bag wrapper to pull tuples in many batches
                 // all bags have reference to the sample tuples buffer
                 // which contains tuples from one batch
@@ -223,16 +220,21 @@ public class POPackage extends PhysicalO
                 }
 
             } else {
+                readOnce[numInputs - 1] = true;
+                // We know the tuples will come sorted by index, so we can wrap
+                // the last input in a ReadOnceBag and let the Packager decide
+                // whether or not to read into memory
+
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
                     dbs[i] = useDefaultBag ? BagFactory.getInstance()
                             .newDefaultBag()
-                    // In a very rare case if there is a POStream after this
-                    // POPackage in the pipeline and is also blocking the
-                    // pipeline;
-                    // constructor argument should be 2 * numInputs. But for one
-                    // obscure
-                    // case we don't want to pay the penalty all the time.
+                            // In a very rare case if there is a POStream after this
+                            // POPackage in the pipeline and is also blocking the
+                            // pipeline;
+                            // constructor argument should be 2 * numInputs. But for one
+                            // obscure
+                            // case we don't want to pay the penalty all the time.
                             : new InternalCachedBag(numInputs);
                 }
                 // For each indexed tup in the inp, sort them
@@ -241,8 +243,11 @@ public class POPackage extends PhysicalO
                 while (tupIter.hasNext()) {
                     NullableTuple ntup = tupIter.next();
                     int index = ntup.getIndex();
-                    Tuple copy = pkgr.getValueTuple(key,
-                            ntup, index);
+                    if (index == numInputs - 1) {
+                        dbs[index] = new PeekedBag(pkgr, ntup, tupIter, keyWritable);
+                        break;
+                    }
+                    Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
 
                     if (numInputs == 1) {
 
@@ -266,10 +271,7 @@ public class POPackage extends PhysicalO
             detachInput();
         }
 
-        Result r = pkgr.getNext();
-        Tuple packedTup = (Tuple) r.result;
-        packedTup = illustratorMarkup(null, packedTup, 0);
-        return r;
+        return pkgr.getNext();
     }
 
     public Packager getPkgr() {
@@ -278,6 +280,8 @@ public class POPackage extends PhysicalO
 
     public void setPkgr(Packager pkgr) {
         this.pkgr = pkgr;
+        pkgr.setParent(this);
+        pkgr.setIllustrator(illustrator);
     }
 
     /**
@@ -341,22 +345,24 @@ public class POPackage extends PhysicalO
                 if (iter.hasNext()) {
                     NullableTuple ntup = iter.next();
                     int index = ntup.getIndex();
-                    Tuple copy = pkgr.getValueTuple(key, ntup, index);
+                    Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
                     if (numInputs == 1) {
+
                         // this is for multi-query merge where
-                         // the numInputs is always 1, but the index
+                        // the numInputs is always 1, but the index
                         // (the position of the inner plan in the
                         // enclosed operator) may not be 1.
                         bags[0].add(copy);
-                     } else {
+                    } else {
                         bags[index].add(copy);
-                     }
+                    }
                 }else{
                     break;
                 }
             }
         }
 
+        @Override
         public void clear() {
             for(int i=0; i<bags.length; i++) {
                 bags[i].clear();
@@ -364,6 +370,7 @@ public class POPackage extends PhysicalO
             iter = null;
         }
 
+        @Override
         public Iterator<Tuple> getTuples(int index) {
             return bags[index].iterator();
         }
@@ -373,74 +380,77 @@ public class POPackage extends PhysicalO
         }
     };
 
-    public Tuple illustratorMarkup2(Object in, Object out) {
-       if(illustrator != null) {
-           ExampleTuple tOut = new ExampleTuple((Tuple) out);
-           illustrator.getLineage().insert(tOut);
-           tOut.synthetic = ((ExampleTuple) in).synthetic;
-           illustrator.getLineage().union(tOut, (Tuple) in);
-           return tOut;
-       } else
-           return (Tuple) out;
-    }
-
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-        if (illustrator != null) {
-            ExampleTuple tOut = new ExampleTuple((Tuple) out);
-            LineageTracer lineageTracer = illustrator.getLineage();
-            lineageTracer.insert(tOut);
-            Tuple tmp;
-            boolean synthetic = false;
-            if (illustrator.getEquivalenceClasses() == null) {
-                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
-                for (int i = 0; i < numInputs; ++i) {
-                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
-                    equivalenceClasses.add(equivalenceClass);
-                }
-                illustrator.setEquivalenceClasses(equivalenceClasses, this);
-            }
+        return pkgr.illustratorMarkup(in, out, eqClassIndex);
+    }
+
+    public int numberOfEquivalenceClasses() {
+        return pkgr.numberOfEquivalenceClasses();
+    }
+
+    // A ReadOnceBag that we've already "peeked" at
+    private static class PeekedBag extends ReadOnceBag {
+        private static final long serialVersionUID = 1L;
+        NullableTuple head;
+        int index;
 
-            if (pkgr.isDistinct()) {
-                int count;
-                for (count = 0; tupIter.hasNext(); ++count) {
-                    NullableTuple ntp = tupIter.next();
-                    tmp = (Tuple) ntp.getValueAsPigType();
-                    if (!tmp.equals(tOut))
-                        lineageTracer.union(tOut, tmp);
+        public PeekedBag(Packager pkgr, NullableTuple head,
+                Iterator<NullableTuple> tupIter,
+                PigNullableWritable keyWritable) {
+            super(pkgr, tupIter, keyWritable);
+            this.head = head;
+            this.index = head.getIndex();
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new Iterator<Tuple>() {
+                boolean headReturned = false;
+
+                @Override
+                public boolean hasNext() {
+                    if (!headReturned)
+                        return true;
+
+                    return tupIter.hasNext();
                 }
-                if (count > 1) // only non-distinct tuples are inserted into the equivalence class
-                    illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-                illustrator.addData((Tuple) tOut);
-                return (Tuple) tOut;
-            }
-            boolean outInEqClass = true;
-            try {
-                for (int i = 1; i < numInputs + 1; i++) {
-                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
-                    Iterator<Tuple> iter = dbs.iterator();
-                    if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
-                        outInEqClass = false;
-                    while (iter.hasNext()) {
-                        tmp = iter.next();
-                        // any of synthetic data in bags causes the output tuple to be synthetic
-                        if (!synthetic && ((ExampleTuple) tmp).synthetic)
-                            synthetic = true;
-                        lineageTracer.union(tOut, tmp);
+
+                @Override
+                public Tuple next() {
+                    if (!headReturned) {
+                        headReturned = true;
+                        try {
+                            return pkgr.getValueTuple(keyWritable, head,
+                                    head.getIndex());
+                        } catch (ExecException e) {
+                            throw new RuntimeException(
+                                    "PeekedBag failed to get value tuple : "
+                                            + e.toString());
+                        }
+                    }
+                    NullableTuple ntup = tupIter.next();
+                    Tuple ret = null;
+                    try {
+                        ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                    } catch (ExecException e) {
+                        throw new RuntimeException(
+                                "PeekedBag failed to get value tuple : "
+                                        + e.toString());
+                    }
+                    if (getReporter() != null) {
+                        getReporter().progress();
                     }
+                    return ret;
                 }
-            } catch (ExecException e) {
-                // TODO better exception handling
-                throw new RuntimeException("Illustrator exception :"
-                        + e.getMessage());
-            }
-            if (outInEqClass)
-                illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-            tOut.synthetic = synthetic;
-            illustrator.addData((Tuple) tOut);
-            return tOut;
-        } else
-            return (Tuple) out;
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException(
+                            "PeekedBag does not support removal");
+                }
+            };
+        }
     }
 
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sat Apr 12 19:06:25 2014
@@ -229,7 +229,7 @@ public class POPartialAgg extends Physic
                 }
             }
             avgTupleSize = estTotalMem / estTuples;
-            int totalTuples = memLimits.getCacheLimit();
+            long totalTuples = memLimits.getCacheLimit();
             LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
             secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Sat Apr 12 19:06:25 2014
@@ -2,20 +2,29 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.Illustrable;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
-public class Packager implements Serializable, Cloneable {
+public class Packager implements Illustrable, Serializable, Cloneable {
 
     private static final long serialVersionUID = 1L;
 
@@ -27,6 +36,8 @@ public class Packager implements Seriali
         GROUP, JOIN
     };
 
+    protected transient Illustrator illustrator = null;
+
     // The key being worked on
     Object key;
 
@@ -64,6 +75,11 @@ public class Packager implements Seriali
 
     private PackageType pkgType;
 
+    boolean firstTime = true;
+    boolean useDefaultBag = false;
+
+    protected POPackage parent = null;
+
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
@@ -78,9 +94,20 @@ public class Packager implements Seriali
 
     public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
             throws ExecException {
+        checkBagType();
+
         this.key = key;
         this.bags = bags;
         this.readOnce = readOnce;
+        // We assume that we need all bags materialized. Specialized subclasses
+        // may choose to handle this differently
+        for (int i = 0; i < bags.length; i++) {
+            if (readOnce[i]) {
+                DataBag materializedBag = getBag();
+                materializedBag.addAll(bags[i]);
+                bags[i] = materializedBag;
+            }
+        }
     }
 
     public Result getNext() throws ExecException {
@@ -112,19 +139,13 @@ public class Packager implements Seriali
                     }
                 }
 
-                if (!readOnce[i]) {
-                    res.set(i + 1, bag);
-                } else {
-                    DataBag readBag = mBagFactory.newDefaultBag();
-                    readBag.addAll(bag);
-                    res.set(i + 1, readBag);
-                }
+                res.set(i + 1, bag);
             }
         }
-        detachInput();
         Result r = new Result();
         r.returnStatus = POStatus.STATUS_OK;
-        r.result = res;
+        r.result = illustratorMarkup(null, res, 0);
+        detachInput();
         return r;
     }
 
@@ -133,8 +154,36 @@ public class Packager implements Seriali
         bags = null;
     }
 
-    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
-            throws ExecException {
+    protected Tuple illustratorMarkup2(Object in, Object out) {
+        if (illustrator != null) {
+            ExampleTuple tOut;
+            if (!(out instanceof ExampleTuple)) {
+                tOut = new ExampleTuple((Tuple) out);
+            } else {
+                tOut = (ExampleTuple) out;
+            }
+            illustrator.getLineage().insert(tOut);
+            tOut.synthetic = ((ExampleTuple) in).synthetic;
+            illustrator.getLineage().union(tOut, (Tuple) in);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
+
+    protected Tuple starMarkup(Tuple key, Tuple val, Tuple out){
+        if (illustrator != null){
+            Tuple copy = illustratorMarkup2(key, out);
+            // For distinct, we also need to retain lineage information from the values.
+            if (isDistinct())
+                copy = illustratorMarkup2(val, out);
+            return copy;
+        } else
+            return (Tuple) out;
+    }
+
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        Object key = getKey(keyWritable);
         // Need to make a copy of the value, as hadoop uses the same ntup
         // to represent each value.
         Tuple val = (Tuple) ntup.getValueAsPigType();
@@ -177,19 +226,19 @@ public class Packager implements Seriali
                     }
                 }
             }
-            // copy = illustratorMarkup2(val, copy);
+            copy = illustratorMarkup2(val, copy);
         } else if (isProjectStar) {
 
             // the whole "value" is present in the "key"
             copy = mTupleFactory.newTuple(((Tuple) key).getAll());
-            // copy = illustratorMarkup2((Tuple) key, copy);
+            copy = starMarkup((Tuple) key, val, copy);
         } else {
 
             // there is no field of the "value" in the
             // "key" - so just make a copy of what we got
             // as the "value"
             copy = mTupleFactory.newTuple(val.getAll());
-            // copy = illustratorMarkup2(val, copy);
+            copy = illustratorMarkup2(val, copy);
         }
         return copy;
     }
@@ -202,6 +251,27 @@ public class Packager implements Seriali
         this.keyType = keyType;
     }
 
+    /**
+     * @return the isKeyTuple
+     */
+    public boolean getKeyTuple() {
+        return isKeyTuple;
+    }
+
+    /**
+     * @return the keyAsTuple
+     */
+    public Tuple getKeyAsTuple() {
+        return isKeyTuple ? (Tuple) key : null;
+    }
+
+    /**
+     * @return the key
+     */
+    public Object getKey() {
+        return key;
+    }
+
     public boolean[] getInner() {
         return inner;
     }
@@ -239,6 +309,15 @@ public class Packager implements Seriali
         return keyInfo;
     }
 
+    public Illustrator getIllustrator() {
+        return illustrator;
+    }
+
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+        this.illustrator = illustrator;
+    }
+
     /**
      * @return the distinct
      */
@@ -265,6 +344,10 @@ public class Packager implements Seriali
         return pkgType;
     }
 
+    public int getNumInputs(byte index) {
+        return numInputs;
+    }
+
     public int getNumInputs() {
         return numInputs;
     }
@@ -279,7 +362,13 @@ public class Packager implements Seriali
         clone.setNumInputs(numInputs);
         clone.setPackageType(pkgType);
         clone.setDistinct(distinct);
-        clone.setInner(inner);
+        if (inner != null) {
+            clone.inner = new boolean[inner.length];
+            for (int i = 0; i < inner.length; i++) {
+                clone.inner[i] = inner[i];
+            }
+        } else
+            clone.inner = null;
         if (keyInfo != null)
             clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
                     keyInfo));
@@ -293,4 +382,97 @@ public class Packager implements Seriali
     public String name() {
         return this.getClass().getSimpleName();
     }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // All customized packagers are introduced during MRCompilaition.
+        // Illustrate happens before that, so we only have to focus on the basic
+        // POPackage
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            boolean synthetic = false;
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+            }
+
+            if (isDistinct()) {
+                int count = 0;
+                for (Tuple tmp : bags[0]){
+                    count++;
+                    if (!tmp.equals(tOut))
+                        lineageTracer.union(tOut, tmp);
+                }
+                if (count > 1) // only non-distinct tuples are inserted into the
+                    // equivalence class
+                    illustrator.getEquivalenceClasses().get(eqClassIndex)
+                    .add(tOut);
+                illustrator.addData((Tuple) tOut);
+                return (Tuple) tOut;
+            }
+            boolean outInEqClass = true;
+            try {
+                for (int i = 1; i < numInputs + 1; i++) {
+                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
+                    Iterator<Tuple> iter = dbs.iterator();
+                    if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2
+                        // records
+                        outInEqClass = false;
+                    while (iter.hasNext()) {
+                        Tuple tmp = iter.next();
+                        // any of synthetic data in bags causes the output tuple
+                        // to be synthetic
+                        if (!synthetic && ((ExampleTuple) tmp).synthetic)
+                            synthetic = true;
+                        lineageTracer.union(tOut, tmp);
+                    }
+                }
+            } catch (ExecException e) {
+                // TODO better exception handling
+                throw new RuntimeException("Illustrator exception :"
+                        + e.getMessage());
+            }
+            if (outInEqClass)
+                illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = synthetic;
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
+
+    public void setParent(POPackage pack) {
+        parent = pack;
+    }
+
+    public int numberOfEquivalenceClasses() {
+        return 1;
+    }
+
+    public void checkBagType() {
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+    }
+
+    public DataBag getBag(){
+        return useDefaultBag ? mBagFactory.newDefaultBag()
+                // In a very rare case if there is a POStream after this
+                // POJoinPackage in the pipeline and is also blocking the pipeline;
+                // constructor argument should be 2 * numInputs. But for one obscure
+                // case we don't want to pay the penalty all the time.
+                : new InternalCachedBag(numInputs-1);
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Sat Apr 12 19:06:25 2014
@@ -145,6 +145,7 @@ public class POShuffleTezLoad extends PO
             }
 
             key = pkgr.getKey(min);
+            keyWritable = min;
 
             DataBag[] bags = new DataBag[numInputs];
             POPackageTupleBuffer buffer = new POPackageTupleBuffer();
@@ -190,7 +191,7 @@ public class POShuffleTezLoad extends PO
                                 for (Object val : vals) {
                                     NullableTuple nTup = (NullableTuple) val;
                                     int index = nTup.getIndex();
-                                    Tuple tup = pkgr.getValueTuple(key, nTup, index);
+                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
                                     bag.add(tup);
                                 }
                             }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Sat Apr 12 19:06:25 2014
@@ -47,12 +47,12 @@ public class AccumulatorOptimizerUtil {
         }
 
         // if POPackage is for distinct, just return
-        if (((POPackage) po_package).getPkgr().isDistinct()) {
+        if (pkgr.isDistinct()) {
             return;
         }
 
         // if any input to POPackage is inner, just return
-        boolean[] isInner = ((POPackage) po_package).getPkgr().getInner();
+        boolean[] isInner = pkgr.getInner();
         for (boolean b: isInner) {
             if (b) {
                 return;

Modified: pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java Sat Apr 12 19:06:25 2014
@@ -25,28 +25,25 @@ import java.util.Iterator;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
- * This bag is specifically created for use by POPackageLite. So it has three
- * properties, the NullableTuple iterator, the key (Object) and the keyInfo
- * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three
- * of which are required in the constructor call. This bag does not store
- * the tuples in memory, but has access to an iterator typically provided by
- * Hadoop. Use this when you already have an iterator over tuples and do not
- * want to copy over again to a new bag.
+ * This bag does not store the tuples in memory, but has access to an iterator
+ * typically provided by Hadoop. Use this when you already have an iterator over
+ * tuples and do not want to copy over again to a new bag.
  */
 public class ReadOnceBag implements DataBag {
 
     // The Packager that created this
-    LitePackager pkgr;
+    protected Packager pkgr;
 
     //The iterator of Tuples. Marked transient because we will never serialize this.
-    transient Iterator<NullableTuple> tupIter;
+    protected transient Iterator<NullableTuple> tupIter;
 
     // The key being worked on
-    Object key;
+    protected PigNullableWritable keyWritable;
 
     /**
      *
@@ -61,11 +58,11 @@ public class ReadOnceBag implements Data
      * @param tupIter Iterator<NullableTuple>
      * @param key Object
      */
-    public ReadOnceBag(LitePackager pkgr, Iterator<NullableTuple> tupIter,
-            Object key) {
+    public ReadOnceBag(Packager pkgr, Iterator<NullableTuple> tupIter,
+            PigNullableWritable keyWritable) {
         this.pkgr = pkgr;
         this.tupIter = tupIter;
-        this.key = key;
+        this.keyWritable = keyWritable;
     }
 
     /* (non-Javadoc)
@@ -217,7 +214,7 @@ public class ReadOnceBag implements Data
         return hash;
     }
 
-    class ReadOnceBagIterator implements Iterator<Tuple>
+    protected class ReadOnceBagIterator implements Iterator<Tuple>
     {
         /* (non-Javadoc)
          * @see java.util.Iterator#hasNext()
@@ -236,7 +233,7 @@ public class ReadOnceBag implements Data
             int index = ntup.getIndex();
             Tuple ret = null;
             try {
-                ret = pkgr.getValueTuple(key, ntup, index);
+                ret = pkgr.getValueTuple(keyWritable, ntup, index);
             } catch (ExecException e)
             {
                 throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());

Modified: pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java Sat Apr 12 19:06:25 2014
@@ -53,10 +53,10 @@ public abstract class SelfSpillBag exten
     public static class MemoryLimits {
 
         private long maxMemUsage;
-        private int cacheLimit = Integer.MAX_VALUE;
+        private long cacheLimit = Integer.MAX_VALUE;
         private long memUsage = 0;
         private long numObjsSizeChecked = 0;
-        
+
         private static float cachedMemUsage = 0.2F;
         private static long maxMem = 0;
         static {
@@ -99,11 +99,11 @@ public abstract class SelfSpillBag exten
          * 
          * @return number of objects limit
          */
-        public int getCacheLimit() {
+        public long getCacheLimit() {
             if (numObjsSizeChecked > 0) {
                 long avgUsage = memUsage / numObjsSizeChecked;
                 if (avgUsage > 0) {
-                    cacheLimit = (int) (maxMemUsage / avgUsage);
+                    cacheLimit = maxMemUsage / avgUsage;
                 }
             }
             return cacheLimit;

Modified: pig/branches/tez/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigContext.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigContext.java Sat Apr 12 19:06:25 2014
@@ -112,7 +112,7 @@ public class PigContext implements Seria
     transient private Map<URL, String> extraJarOriginalPaths = new HashMap<URL, String>();
 
     // jars needed for scripting udfs - jython.jar etc
-    public List<String> scriptJars = new ArrayList<String>(2);
+    transient public List<String> scriptJars = new ArrayList<String>(2);
 
     // jars that should not be merged in.
     // (some functions may come from pig.jar and we don't want the whole jar file.)
@@ -120,7 +120,7 @@ public class PigContext implements Seria
 
     // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
     transient public Vector<String> predeployedJars = new Vector<String>(2);
-    
+
     // script files that are needed to run a job
     @Deprecated
     public List<String> scriptFiles = new ArrayList<String>();
@@ -259,9 +259,9 @@ public class PigContext implements Seria
         String pigJar = JarManager.findContainingJar(Main.class);
         String hadoopJar = JarManager.findContainingJar(FileSystem.class);
         if (pigJar != null) {
-            skipJars.add(pigJar);
+            addSkipJar(pigJar);
             if (!pigJar.equals(hadoopJar))
-                skipJars.add(hadoopJar);
+                addSkipJar(hadoopJar);
         }
 
         this.executionEngine = execType.getExecutionEngine(this);
@@ -320,18 +320,7 @@ public class PigContext implements Seria
      * @param path
      */
     public void addScriptFile(String path) {
-        if (path != null) {
-            aliasedScriptFiles.put(path.replaceFirst("^/", "").replaceAll(":", ""), new File(path));
-        }
-    }
-
-    public boolean hasJar(String path) {
-        for (URL url : extraJars) {
-            if (extraJarOriginalPaths.get(url).equals(path)) {
-                return true;
-            }
-        }
-        return false;
+        addScriptFile(path, path);
     }
 
     /**
@@ -346,6 +335,18 @@ public class PigContext implements Seria
         }
     }
 
+    public void addScriptJar(String path) {
+        if (path != null && !scriptJars.contains(path)) {
+            scriptJars.add(path);
+        }
+    }
+
+    public void addSkipJar(String path) {
+        if (path != null && !skipJars.contains(path)) {
+            skipJars.add(path);
+        }
+    }
+
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();
@@ -354,14 +355,23 @@ public class PigContext implements Seria
     }
 
     public void addJar(URL resource, String originalPath) throws MalformedURLException{
-        if (resource != null) {
+        if (resource != null && !extraJars.contains(resource)) {
             extraJars.add(resource);
             extraJarOriginalPaths.put(resource, originalPath);
             classloader.addURL(resource);
             Thread.currentThread().setContextClassLoader(PigContext.classloader);
         }
     }
-    
+
+    public boolean hasJar(String path) {
+        for (URL url : extraJars) {
+            if (extraJarOriginalPaths.get(url).equals(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Adds the specified path to the predeployed jars list. These jars will 
      * never be included in generated job jar.
@@ -370,7 +380,9 @@ public class PigContext implements Seria
      * cluster to reduce the size of the job jar.
      */
     public void markJarAsPredeployed(String path) {
-        predeployedJars.add(path);
+        if (path != null && !predeployedJars.contains(path)) {
+            predeployedJars.add(path);
+        }
     }
 
     public String doParamSubstitution(InputStream in,

Modified: pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java Sat Apr 12 19:06:25 2014
@@ -196,10 +196,7 @@ public class IllustratorAttacher extends
 
     @Override
     public void visitPackage(POPackage pkg) throws VisitorException{
-         if (!(pkg.getPkgr() instanceof LitePackager) && pkg.getPkgr().isDistinct())
-             setIllustrator(pkg, 1);
-         else
-             setIllustrator(pkg, null);
+        setIllustrator(pkg, pkg.numberOfEquivalenceClasses());
     }
 
     @Override
@@ -210,17 +207,15 @@ public class IllustratorAttacher extends
         for (PhysicalPlan innerPlan : innerPlans)
           innerPlanAttach(nfe, innerPlan);
         List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
-        if (preds != null && preds.size() == 1 && preds.get(0) instanceof POPackage) {
-            POPackage pkg = (POPackage) preds.get(0);
-            if (!(pkg.getPkgr() instanceof LitePackager) && pkg.getPkgr().isDistinct()) {
-                // equivalence class of POPackage for DISTINCT needs to be used
-                // instead of the succeeding POForEach's equivalence class
-                setIllustrator(nfe, pkg.getIllustrator().getEquivalenceClasses());
-                nfe.getIllustrator().setEqClassesShared();
-            }
-        } else {
+        if (preds != null && preds.size() == 1
+                && preds.get(0) instanceof POPackage
+                && ((POPackage) preds.get(0)).getPkgr().isDistinct()) {
+            // equivalence class of POPackage for DISTINCT needs to be used
+            //instead of the succeeding POForEach's equivalence class
+            setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
+            nfe.getIllustrator().setEqClassesShared();
+        } else
             setIllustrator(nfe, 1);
-        }
     }
 
     @Override