You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/12 21:06:26 UTC
svn commit: r1586885 [1/2] - in /pig/branches/tez: ./ ivy/ src/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/fetch/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionen...
Author: cheolsoo
Date: Sat Apr 12 19:06:25 2014
New Revision: 1586885
URL: http://svn.apache.org/r1586885
Log:
Merge latest trunk changes
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
- copied unchanged from r1586680, pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
Modified:
pig/branches/tez/ (props changed)
pig/branches/tez/CHANGES.txt
pig/branches/tez/KEYS
pig/branches/tez/ivy/libraries.properties
pig/branches/tez/src/org/apache/pig/PigServer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java
pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
pig/branches/tez/src/org/apache/pig/impl/PigContext.java
pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java
pig/branches/tez/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
pig/branches/tez/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
pig/branches/tez/src/org/apache/pig/scripting/js/JsScriptEngine.java
pig/branches/tez/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
pig/branches/tez/src/org/apache/pig/tools/grunt/GruntParser.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/branches/tez/src/pig-default.properties (props changed)
pig/branches/tez/test/e2e/pig/tests/macro.conf
pig/branches/tez/test/e2e/pig/tests/multiquery.conf
pig/branches/tez/test/e2e/pig/tests/turing_jython.conf
pig/branches/tez/test/org/apache/pig/test/TestExampleGenerator.java
pig/branches/tez/test/org/apache/pig/test/TestJobSubmission.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
Propchange: pig/branches/tez/
------------------------------------------------------------------------------
Merged /pig/trunk:r1582882-1586680
Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Sat Apr 12 19:06:25 2014
@@ -30,6 +30,10 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
+
+PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
+
PIG-3449: Move JobCreationException to org.apache.pig.backend.hadoop.executionengine (cheolsoo)
PIG-3765: Ability to disable Pig commands and operators (prkommireddi)
@@ -96,9 +100,19 @@ PIG-3117: A debug mode in which pig does
PIG-3484: Make the size of pig.script property configurable (cheolsoo)
OPTIMIZATIONS
+
+PIG-3882: Multiquery off mode execution is not done in batch and very inefficient (rohini)
BUG FIXES
+PIG-3871: Replace org.python.google.* with com.google.* in imports (cheolsoo)
+
+PIG-3858: PigLogger/PigStatusReporter is not set for fetch tasks (lbendig via cheolsoo)
+
+PIG-3798: Registered jar in pig script are appended to the classpath multiple times (cheolsoo)
+
+PIG-3844: Make ScriptState InheritableThreadLocal for threads that need it (amatsukawa via cheolsoo)
+
PIG-3837: ant pigperf target is broken in trunk (cheolsoo)
PIG-3836: Pig signature has has guava version dependency (amatsukawa via cheolsoo)
@@ -162,8 +176,6 @@ PIG-3641: Split "otherwise" producing in
PIG-3682: mvn-inst target does not install pig-h2.jar into local .m2 (raluri via aniket486)
-PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini)
-
PIG-3511: Security: Pig temporary directories might have world readable permissions (rohini)
PIG-3664: Piggy Bank XPath UDF can't be called (nezihyigitbasi via daijy)
@@ -263,6 +275,10 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini)
+
+PIG-3819: e2e tests containing "perl -e "print $_;" fails on Hadoop 2 (daijy)
+
PIG-3813: Rank column is assigned different uids everytime when schema is reset (cheolsoo)
PIG-3833: Relation loaded by AvroStorage with schema is projected incorrectly in foreach statement (jeongjinku via cheolsoo)
Modified: pig/branches/tez/KEYS
URL: http://svn.apache.org/viewvc/pig/branches/tez/KEYS?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/KEYS (original)
+++ pig/branches/tez/KEYS Sat Apr 12 19:06:25 2014
@@ -114,3 +114,35 @@ rllbZkbS2bye7GzOLvBNqk5Z2PuUGP9henu+Q807
YSsRv5xz/aBvo9tkoZtzYeOqDe7Ut2nkgc+DD2d6YWnmFw==
=moXg
-----END PGP PUBLIC KEY BLOCK-----
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG/MacGPG2 v2.0.22 (Darwin)
+Comment: GPGTools - https://gpgtools.org
+
+mQENBFM/xdgBCAC5ZzlSfaTAfzuSL50HbTkQQpEdk/oRPUDIeIIHPKoj8iaXxydP
+zdafXuFMnnJceJDbczfzsu7+owXQWV/WM/zQ8ZxXsdS2LSXPMhNkYhgJKk6VHoQc
+ssvlXr5C7kQrazeTt7IGn6eWJknGNMRivt5xpzLZ/LJfeqyiID7pl2VALxS6argN
+6zeVQJG5hk1+z2SiUrVvHDIHdH9NEZAMCclrQvXEqoTV3vO+56TRyzMVfU+zrkSM
+Pyuoii1ATjqjQT3HQyh31fIRUDD5fXDyeLbekBWWbvevtyiY4oBuk5dPgJ+3O5OV
+AWGbBVmTj+DZc8myI8dIg3saZ0D6ehzut8WPABEBAAG0InBya29tbWlyZWRkaSA8
+cHJhc2gxNzg0QGdtYWlsLmNvbT6JATcEEwEKACEFAlM/xdgCGwMFCwkIBwMFFQoJ
+CAsFFgIDAQACHgECF4AACgkQB2xWLNUxu+pENAgAl32vD24/omZa5tN+fxGKh0AD
+fYt8l+RNlhoa0ghJQWVta8siuak0mW90prnzUc1zKM4NTb1I9L8K2QfIq97DhnbU
+Cq0SRKn+hKsN/qW9dydTop8ooqzYpgF6aWx+mx6aooJhk3ZayLA1DaNIJ6frInhy
+l1j6VjDFvTXyvyf9JuhDiCMDzeFtcOJ12Yp2KDhbNKugAgMFbjmUicoRG7T4ImJ2
+N4ndKuYKOV8BLoSNjezohaHs+6PtLvYOsGMHGmu72fCCMsHg33FLHNjMS9+/2Vh9
+7QLv2/jImcJvKkw4JPd6I5LZD7AkOA4E3XE7J/cd2FmZXYQzmrERohcarAj4hLkB
+DQRTP8XYAQgAulCh4cEbmzFQTwjGjCh6r/hYw6ZAcltQMPbUxLatcysq3Y718YYV
+/MBgR+Dlqu41lR30S1QuRAH4tPVraFvuBojm7BOReXe5bQYuDShtyXdkRRxXKlA7
+FmdD783FAI2XMNlywO+Pq/4wJ/Cbb5HALwAuUF+oOTUnhkRoAS8Q665FxD4AwxgE
+WHW1UT/tyv0rEo9X4XGuufdXeBZoNpqdt16BsW/rmFrXesv//+lp9X8iLQMccd3l
+Jnv6v5e0WTBFbBChy5W0vEYFlZ5N6nyNNgmindTa+kKcdrkv3YP74KRBIG8TXBC9
+HUNXDGMqMAlmLGJtzp0yNupvkRp0t8pHoQARAQABiQEfBBgBCgAJBQJTP8XYAhsM
+AAoJEAdsVizVMbvqSWsH/isN8m8fRJFrVCJAlDb54ABeFa6xwGodddosKmrTi83s
+CJ5uJl+G3d2CK4kTePyj9VGoRw5BA+3anotGQXhcQzfjPcraepWkj6RbWV1tnEQm
+IzXtIIh/fsswEroIwZlrJ3/qI6u+DrZYF5GF9SfA+qTRjlH/8HOzdpAU2ZlBIzT/
+guy3kvmcHWXO7JHSjDNz4XGTbkLYkh20le1XTlMdDy1OSIIGRVWBgyWHICZHXNnk
+I/0TpEVFgcGGTT2CSqLAFUlFdL6CzpwpGXhUxzSmdW5ULIs4dm2GulCKwMArfQE+
+URdA1jknkPjIIC1wBetzMrIbfcsoZS4SLbZm9u8C+2k=
+=u7gH
+-----END PGP PUBLIC KEY BLOCK-----
Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Sat Apr 12 19:06:25 2014
@@ -91,5 +91,5 @@ mockito.version=1.8.4
jansi.version=1.9
asm.version=3.3.1
snappy.version=1.1.0.1
-tez.version=0.4.0-incubating
+tez.version=0.5.0-incubating-SNAPSHOT
parquet-pig-bundle.version=1.2.3
Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Sat Apr 12 19:06:25 2014
@@ -99,12 +99,12 @@ import org.apache.pig.parser.QueryParser
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.validator.BlackAndWhitelistFilter;
import org.apache.pig.validator.BlackAndWhitelistValidator;
import org.apache.pig.validator.PigCommandFilter;
@@ -154,8 +154,6 @@ public class PigServer {
protected final String scope = constructScope();
-
- private boolean isMultiQuery = true;
private boolean aggregateWarning = true;
private boolean validateEachStatement = false;
@@ -235,8 +233,6 @@ public class PigServer {
currDAG = new Graph(false);
aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
- isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties()
- .getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
jobName = pigContext.getProperties().getProperty(
PigContext.JOB_NAME,
@@ -347,7 +343,7 @@ public class PigServer {
if (currDAG != null) {
graphs.push(currDAG);
}
- currDAG = new Graph(isMultiQuery);
+ currDAG = new Graph(true);
}
/**
@@ -420,13 +416,7 @@ public class PigServer {
parseAndBuild();
}
- PigStats stats = null;
- if( !isMultiQuery ) {
- // ignore if multiquery is off
- stats = PigStats.get();
- } else {
- stats = execute();
- }
+ PigStats stats = execute();
return getJobs(stats);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Sat Apr 12 19:06:25 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -40,8 +41,9 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.EmptyPigStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTimeZone;
/**
@@ -128,6 +130,13 @@ public class FetchLauncher {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PigStatusReporter.getInstance().setFetchContext(new FetchContext());
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
}
private void runPipeline(POStore posStore) throws IOException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sat Apr 12 19:06:25 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Sat Apr 12 19:06:25 2014
@@ -1610,7 +1610,7 @@ public class JobControlCompiler{
Path pathInHDFS = shipToHDFS(pigContext, conf, url);
// and add to the DistributedCache
DistributedCache.addFileToClassPath(pathInHDFS, conf);
- pigContext.skipJars.add(url.getPath());
+ pigContext.addSkipJar(url.getPath());
}
private static Path getCacheStagingDir(Configuration conf) throws IOException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Sat Apr 12 19:06:25 2014
@@ -2749,16 +2749,27 @@ public class MRCompiler extends PhyPlanV
JoinPackager pkgr = new JoinPackager(pack.getPkgr(), forEach);
pkgr.setChunkSize(Long.parseLong(chunkSize));
pack.setPkgr(pkgr);
+ List<PhysicalOperator> succs = plan.getSuccessors(forEach);
+ if (succs != null) {
+ if (succs.size() != 1) {
+ int errCode = 2028;
+ String msg = "ForEach can only have one successor. Found "
+ + succs.size() + " successors.";
+ throw new MRCompilerException(msg, errCode,
+ PigException.BUG);
+ }
+ }
plan.remove(pack);
try {
plan.replace(forEach, pack);
} catch (PlanException e) {
int errCode = 2029;
- String msg = "Error rewriting POJoinPackage.";
+ String msg = "Error rewriting join package.";
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
-
- LogFactory.getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
+ mr.phyToMRMap.put(forEach, pack);
+ LogFactory.getLog(LastInputStreamingOptimizer.class).info(
+ "Rewrite: POPackage->POForEach to POPackage(JoinPackager)");
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Sat Apr 12 19:06:25 2014
@@ -691,7 +691,7 @@ class MultiQueryOptimizer extends MROpPl
List<Packager> pkgs = ((MultiQueryPackager) fromPkgr)
.getPackagers();
for (Packager p : pkgs) {
- ((MultiQueryPackager) fromPkgr).addPackager(p);
+ ((MultiQueryPackager) toPkgr).addPackager(p);
pkCount++;
}
toPkgr.addIsKeyWrappedList(((MultiQueryPackager) fromPkgr)
@@ -733,7 +733,7 @@ class MultiQueryOptimizer extends MROpPl
}
if (toPkgr.isSameMapKeyType()) {
- toPkgr.setKeyType(pk.getPkgr().getKeyType());
+ toPkgr.setKeyType(fromPkgr.getKeyType());
} else {
toPkgr.setKeyType(DataType.TUPLE);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Sat Apr 12 19:06:25 2014
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
@@ -47,6 +48,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -191,6 +193,15 @@ public class PlanPrinter<O extends Opera
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
}
+ else if(node instanceof POPackage){
+ Packager pkgr = ((POPackage) node).getPkgr();
+ if(pkgr instanceof MultiQueryPackager){
+ List<Packager> pkgrs = ((MultiQueryPackager) pkgr).getPackagers();
+ for (Packager child : pkgrs){
+ sb.append(LSep + child.name() + "\n");
+ }
+ }
+ }
else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Sat Apr 12 19:06:25 2014
@@ -33,6 +33,7 @@ import javax.xml.transform.stream.Stream
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -40,16 +41,19 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import org.w3c.dom.Node;
public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
@@ -175,6 +179,11 @@ public class XMLPhysicalPlanPrinter<P ex
subPlans = ((POSplit)node).getPlans();
} else if (node instanceof PODemux) {
subPlans = ((PODemux)node).getPlans();
+ } else if(node instanceof POPackage){
+ childNode = createPONode(node);
+ Packager pkgr = ((POPackage) node).getPkgr();
+ Node pkgrNode = createPackagerNode(pkgr);
+ childNode.appendChild(pkgrNode);
} else if(node instanceof POFRJoin){
childNode = createPONode(node);
POFRJoin frj = (POFRJoin)node;
@@ -215,4 +224,16 @@ public class XMLPhysicalPlanPrinter<P ex
depthFirst(pred, childNode);
}
}
+
+ private Node createPackagerNode(Packager pkgr) {
+ Element pkgrNode = doc.createElement(pkgr.getClass().getSimpleName());
+ if (pkgr instanceof MultiQueryPackager) {
+ List<Packager> pkgrs = ((MultiQueryPackager) pkgr)
+ .getPackagers();
+ for (Packager child : pkgrs) {
+ pkgrNode.appendChild(createPackagerNode(child));
+ }
+ }
+ return pkgrNode;
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Sat Apr 12 19:06:25 2014
@@ -29,6 +29,7 @@ import org.apache.pig.data.InternalCache
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.Pair;
/**
* The package operator that packages the globally rearranged tuples into
@@ -55,12 +56,12 @@ public class CombinerPackager extends Pa
* @param bags for each field, indicates whether it should be a bag (true)
* or a simple field (false).
*/
- public CombinerPackager(Packager pkgr, boolean[] bags) {
+ public CombinerPackager(Packager pkg, boolean[] bags) {
super();
- keyType = pkgr.keyType;
+ keyType = pkg.keyType;
numInputs = 1;
- inner = new boolean[pkgr.inner.length];
- for (int i = 0; i < pkgr.inner.length; i++) {
+ inner = new boolean[pkg.inner.length];
+ for (int i = 0; i < pkg.inner.length; i++) {
inner[i] = true;
}
if (bags != null) {
@@ -75,6 +76,7 @@ public class CombinerPackager extends Pa
/**
* @param keyInfo the keyInfo to set
*/
+ @Override
public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
this.keyInfo = keyInfo;
// TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
@@ -84,15 +86,15 @@ public class CombinerPackager extends Pa
// has an index of 0. When we do support combiner in Cogroups
// THIS WILL NEED TO BE REVISITED.
Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+ keyInfo.get(0); // assumption: only group are "combinable", hence index 0
keyLookup = lrKeyInfo.second;
}
private DataBag createDataBag(int numBags) {
String bagType = null;
if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
- }
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ }
if (bagType != null && bagType.equalsIgnoreCase("default")) {
return new NonSpillableDataBag();
@@ -137,10 +139,10 @@ public class CombinerPackager extends Pa
detachInput();
- // The successor of the POCombinerPackage as of
+ // The successor of the POPackage(Combiner) as of
// now SHOULD be a POForeach which has been adjusted
// to look for its inputs by projecting from the corresponding
- // positions in the POCombinerPackage output.
+ // positions in the POPackage(Combiner) output.
// So we will NOT be adding the key in the result here but merely
// putting all bags into a result tuple and returning it.
Tuple res;
@@ -150,11 +152,12 @@ public class CombinerPackager extends Pa
r.result = res;
r.returnStatus = POStatus.STATUS_OK;
return r;
+
}
@Override
- public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
- throws ExecException {
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
return (Tuple) ntup.getValueAsPigType();
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Sat Apr 12 19:06:25 2014
@@ -21,28 +21,23 @@ import java.util.Iterator;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.Illustrator;
public class JoinPackager extends Packager {
private static final long serialVersionUID = 1L;
-
private POOptimizedForEach forEach;
private boolean newKey = true;
private Tuple res = null;
private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
- private boolean firstTime = true;
- private boolean useDefaultBag = false;
public static final String DEFAULT_CHUNK_SIZE = "1000";
@@ -80,9 +75,9 @@ public class JoinPackager extends Packag
* Calls getNext to get next ForEach result. The input for POJoinPackage is
* a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
* one tuple a time to the delegated ForEach operator, the input for ForEach is
- *
+ *
* (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
- *
+ *
* of k tuples.
* For every ForEach input, pull all the results from ForEach.
* getNext will be called multiple times for a particular input,
@@ -91,17 +86,6 @@ public class JoinPackager extends Packag
*/
@Override
public Result getNext() throws ExecException {
-
- if(firstTime){
- firstTime = false;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- useDefaultBag = true;
- }
- }
- }
-
Tuple it = null;
// If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
@@ -113,20 +97,7 @@ public class JoinPackager extends Packag
// Put n-1 inputs into bags
dbs = new DataBag[numInputs];
for (int i = 0; i < numInputs - 1; i++) {
- if (!readOnce[i]) {
- dbs[i] = bags[i];
- } else {
- dbs[i] = useDefaultBag ? BagFactory.getInstance()
- .newDefaultBag()
- // In a very rare case if there is a POStream after this
- // POJoinPackage in the pipeline and is also blocking the
- // pipeline;
- // constructor argument should be 2 * numInputs. But for one
- // obscure
- // case we don't want to pay the penalty all the time.
- : new InternalCachedBag(numInputs - 1);
- dbs[i].addAll(bags[i]);
- }
+ dbs[i] = bags[i];
}
// For last bag, we always use NonSpillableBag.
@@ -218,9 +189,25 @@ public class JoinPackager extends Packag
@Override
public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
- super.attachInput(key, bags, readOnce);
+ checkBagType();
+
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // JoinPackager expects all but the last bag to be materialized
+ for (int i = 0; i < bags.length - 1; i++) {
+ if (readOnce[i]) {
+ DataBag materializedBag = getBag();
+ materializedBag.addAll(bags[i]);
+ bags[i] = materializedBag;
+ }
+ }
+ if (readOnce[numInputs - 1] != true) {
+ throw new ExecException(
+ "JoinPackager expects the last input to be streamed");
+ }
this.newKey = true;
- };
+ }
public List<PhysicalPlan> getInputPlans() {
return forEach.getInputPlans();
@@ -247,4 +234,15 @@ public class JoinPackager extends Packag
public void setChunkSize(long chunkSize) {
this.chunkSize = chunkSize;
}
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ forEach.setIllustrator(illustrator);
+ }
+
+ @Override
+ public String name() {
+ return this.getClass().getSimpleName() + "(" + forEach.getFlatStr() + ")";
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Sat Apr 12 19:06:25 2014
@@ -17,20 +17,24 @@
*/
/**
- *
+ *
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/**
* This package operator is a specialization
@@ -41,11 +45,14 @@ import org.apache.pig.impl.util.Pair;
public class LitePackager extends Packager {
private static final long serialVersionUID = 1L;
+ private PigNullableWritable keyWritable;
+ @Override
public boolean[] getInner() {
return null;
}
+ @Override
public void setInner(boolean[] inner) {
}
@@ -58,12 +65,7 @@ public class LitePackager extends Packag
LitePackager clone = (LitePackager) super.clone();
clone.inner = null;
if (keyInfo != null) {
- clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-
- for (Entry<Integer, Pair<Boolean, Map<Integer, Integer>>> entry : keyInfo
- .entrySet()) {
- clone.keyInfo.put(entry.getKey(), entry.getValue());
- }
+ clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(keyInfo);
}
return clone;
}
@@ -84,27 +86,6 @@ public class LitePackager extends Packag
}
/**
- * @return the isKeyTuple
- */
- public boolean getKeyTuple() {
- return isKeyTuple;
- }
-
- /**
- * @return the keyAsTuple
- */
- public Tuple getKeyAsTuple() {
- return isKeyTuple ? (Tuple) key : null;
- }
-
- /**
- * @return the key
- */
- public Object getKey() {
- return key;
- }
-
- /**
* Similar to POPackage.getNext except that
* only one input is expected with index 0
* and ReadOnceBag is used instead of
@@ -127,27 +108,48 @@ public class LitePackager extends Packag
detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_OK;
- r.result = res;
+ r.result = illustratorMarkup(null, res, 0);
return r;
}
/**
- * Makes use of the superclass method, but this requires
- * an additional parameter key passed by ReadOnceBag.
- * key of this instance will be set to null in detachInput
- * call, but an instance of ReadOnceBag may have the original
- * key that it uses. Therefore this extra argument is taken
- * to temporarily set it before the call to the superclass method
- * and then restore it.
+ * Makes use of the superclass method, but this requires an additional
+ * parameter key passed by ReadOnceBag. key of this instance will be set to
+ * null in detachInput call, but an instance of ReadOnceBag may have the
+ * original key that it uses. Therefore this extra argument is taken to
+ * temporarily set it before the call to the superclass method and then
+ * restore it.
*/
@Override
- public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
- throws ExecException {
- Object origKey = this.key;
- this.key = key;
- Tuple retTuple = super.getValueTuple(key, ntup, index);
- this.key = origKey;
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ PigNullableWritable origKey = this.keyWritable;
+ this.keyWritable = keyWritable;
+ Tuple retTuple = super.getValueTuple(keyWritable, ntup, index);
+ this.keyWritable = origKey;
return retTuple;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+ }
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = false; // not expect this to be really used
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Sat Apr 12 19:06:25 2014
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.NullableUnknownWritable;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -84,12 +85,12 @@ public class MultiQueryPackager extends
*/
private boolean inCombiner = false;
- transient private PigNullableWritable myKey;
+ private PigNullableWritable keyWritable = null;
/**
* Appends the specified package object to the end of
* the package list.
- *
+ *
* @param pack package to be appended to the list
*/
public void addPackager(Packager pkgr) {
@@ -99,7 +100,7 @@ public class MultiQueryPackager extends
/**
* Appends the specified package object to the end of
* the package list.
- *
+ *
* @param pack package to be appended to the list
* @param mapKeyType the map key type associated with the package
*/
@@ -113,7 +114,7 @@ public class MultiQueryPackager extends
/**
* Returns the list of packages.
- *
+ *
* @return the list of the packages
*/
public List<Packager> getPackagers() {
@@ -133,7 +134,7 @@ public class MultiQueryPackager extends
return new Result(POStatus.STATUS_EOP, null);
}
- byte origIndex = myKey.getIndex();
+ byte origIndex = keyWritable.getIndex();
int index = (int)origIndex;
index &= idxPart;
@@ -151,9 +152,9 @@ public class MultiQueryPackager extends
// check to see if we need to unwrap the key. The keys may be
// wrapped inside a tuple by LocalRearrange operator when jobs
// with different map key types are merged
- PigNullableWritable curKey = myKey;
+ PigNullableWritable curKey = keyWritable;
if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
- Tuple tup = (Tuple)myKey.getValueAsPigType();
+ Tuple tup = (Tuple) keyWritable.getValueAsPigType();
curKey = HDataType.getWritableComparableTypes(tup.get(0),
pkgr.getKeyType());
curKey.setIndex(origIndex);
@@ -193,14 +194,14 @@ public class MultiQueryPackager extends
myObj.setIndex(origIndex);
tuple.set(0, myObj);
}
- // illustrator markup has been handled by "pack"
+ // illustrator markup has been handled by "pkgr"
return res;
}
/**
* Returns the list of booleans that indicates if the
* key needs to unwrapped for the corresponding plan.
- *
+ *
* @return the list of isKeyWrapped boolean values
*/
public List<Boolean> getIsKeyWrappedList() {
@@ -209,7 +210,7 @@ public class MultiQueryPackager extends
/**
* Adds a list of IsKeyWrapped boolean values
- *
+ *
* @param lst the list of boolean values to add
*/
public void addIsKeyWrappedList(List<Boolean> lst) {
@@ -234,4 +235,16 @@ public class MultiQueryPackager extends
return sameMapKeyType;
}
+ @Override
+ public int getNumInputs(byte index) {
+ return packagers.get(((int) index) & idxPart).getNumInputs(index);
+ }
+
+ @Override
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ this.keyWritable = keyWritable;
+ return packagers.get(((int) index) & idxPart).getValueTuple(
+ keyWritable, ntup, index);
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Sat Apr 12 19:06:25 2014
@@ -19,11 +19,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -35,6 +32,7 @@ import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
@@ -42,10 +40,7 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
/**
* The package operator that packages
@@ -77,14 +72,6 @@ public class POPackage extends PhysicalO
//key, no value.
protected int numInputs;
- // A mapping of input index to key information got from LORearrange
- // for that index. The Key information is a pair of boolean, Map.
- // The boolean indicates whether there is a lone project(*) in the
- // cogroup by. If not, the Map has a mapping of column numbers in the
- // "value" to column numbers in the "key" which contain the fields in
- // the "value"
- protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
-
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -94,7 +81,7 @@ public class POPackage extends PhysicalO
protected Packager pkgr;
- private boolean[] readOnce;
+ protected PigNullableWritable keyWritable;
public POPackage(OperatorKey k) {
this(k, -1, null);
@@ -115,16 +102,21 @@ public class POPackage extends PhysicalO
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
Packager pkgr) {
super(k, rp, inp);
- this.numInputs = -1;
- this.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+ numInputs = -1;
this.pkgr = pkgr;
}
public POPackage(POPackage copy) {
super(copy);
this.numInputs = copy.numInputs;
- this.keyInfo = copy.keyInfo;
this.pkgr = copy.pkgr;
+ this.pkgr.keyInfo = copy.pkgr.keyInfo;
+ }
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ super.setIllustrator(illustrator);
+ pkgr.setIllustrator(illustrator);
}
@Override
@@ -160,6 +152,7 @@ public class POPackage extends PhysicalO
try {
tupIter = inp;
key = pkgr.getKey(k);
+ keyWritable = k;
inputAttached = true;
} catch (Exception e) {
throw new RuntimeException(
@@ -171,6 +164,7 @@ public class POPackage extends PhysicalO
/**
* attachInput's better half!
*/
+ @Override
public void detachInput() {
tupIter = null;
key = null;
@@ -184,15 +178,12 @@ public class POPackage extends PhysicalO
public void setNumInps(int numInps) {
this.numInputs = numInps;
pkgr.setNumInputs(numInps);
- readOnce = new boolean[numInputs];
- for (int i = 0; i < numInputs; i++)
- readOnce[i] = false;
}
/**
- * From the inputs, constructs the output tuple
- * for this co-group in the required format which
- * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+ * From the inputs, constructs the output tuple for this co-group in the
+ * required format which is (key, {bag of tuples from input 1}, {bag of
+ * tuples from input 2}, ...)
*/
@Override
public Result getNextTuple() throws ExecException {
@@ -206,12 +197,18 @@ public class POPackage extends PhysicalO
}
}
}
+ int numInputs = pkgr.getNumInputs(keyWritable.getIndex());
+ boolean[] readOnce = new boolean[numInputs];
+ for (int i = 0; i < numInputs; i++)
+ readOnce[i] = false;
+
if (isInputAttached()) {
// Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
if (isAccumulative()) {
+ readOnce[numInputs - 1] = false;
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
// which contains tuples from one batch
@@ -223,16 +220,21 @@ public class POPackage extends PhysicalO
}
} else {
+ readOnce[numInputs - 1] = true;
+ // We know the tuples will come sorted by index, so we can wrap
+ // the last input in a ReadOnceBag and let the Packager decide
+ // whether or not to read into memory
+
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
dbs[i] = useDefaultBag ? BagFactory.getInstance()
.newDefaultBag()
- // In a very rare case if there is a POStream after this
- // POPackage in the pipeline and is also blocking the
- // pipeline;
- // constructor argument should be 2 * numInputs. But for one
- // obscure
- // case we don't want to pay the penalty all the time.
+ // In a very rare case if there is a POStream after this
+ // POPackage in the pipeline and is also blocking the
+ // pipeline;
+ // constructor argument should be 2 * numInputs. But for one
+ // obscure
+ // case we don't want to pay the penalty all the time.
: new InternalCachedBag(numInputs);
}
// For each indexed tup in the inp, sort them
@@ -241,8 +243,11 @@ public class POPackage extends PhysicalO
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
- Tuple copy = pkgr.getValueTuple(key,
- ntup, index);
+ if (index == numInputs - 1) {
+ dbs[index] = new PeekedBag(pkgr, ntup, tupIter, keyWritable);
+ break;
+ }
+ Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
if (numInputs == 1) {
@@ -266,10 +271,7 @@ public class POPackage extends PhysicalO
detachInput();
}
- Result r = pkgr.getNext();
- Tuple packedTup = (Tuple) r.result;
- packedTup = illustratorMarkup(null, packedTup, 0);
- return r;
+ return pkgr.getNext();
}
public Packager getPkgr() {
@@ -278,6 +280,8 @@ public class POPackage extends PhysicalO
public void setPkgr(Packager pkgr) {
this.pkgr = pkgr;
+ pkgr.setParent(this);
+ pkgr.setIllustrator(illustrator);
}
/**
@@ -341,22 +345,24 @@ public class POPackage extends PhysicalO
if (iter.hasNext()) {
NullableTuple ntup = iter.next();
int index = ntup.getIndex();
- Tuple copy = pkgr.getValueTuple(key, ntup, index);
+ Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
if (numInputs == 1) {
+
// this is for multi-query merge where
- // the numInputs is always 1, but the index
+ // the numInputs is always 1, but the index
// (the position of the inner plan in the
// enclosed operator) may not be 1.
bags[0].add(copy);
- } else {
+ } else {
bags[index].add(copy);
- }
+ }
}else{
break;
}
}
}
+ @Override
public void clear() {
for(int i=0; i<bags.length; i++) {
bags[i].clear();
@@ -364,6 +370,7 @@ public class POPackage extends PhysicalO
iter = null;
}
+ @Override
public Iterator<Tuple> getTuples(int index) {
return bags[index].iterator();
}
@@ -373,74 +380,77 @@ public class POPackage extends PhysicalO
}
};
- public Tuple illustratorMarkup2(Object in, Object out) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- illustrator.getLineage().insert(tOut);
- tOut.synthetic = ((ExampleTuple) in).synthetic;
- illustrator.getLineage().union(tOut, (Tuple) in);
- return tOut;
- } else
- return (Tuple) out;
- }
-
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if (illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert(tOut);
- Tuple tmp;
- boolean synthetic = false;
- if (illustrator.getEquivalenceClasses() == null) {
- LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
- for (int i = 0; i < numInputs; ++i) {
- IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
- equivalenceClasses.add(equivalenceClass);
- }
- illustrator.setEquivalenceClasses(equivalenceClasses, this);
- }
+ return pkgr.illustratorMarkup(in, out, eqClassIndex);
+ }
+
+ public int numberOfEquivalenceClasses() {
+ return pkgr.numberOfEquivalenceClasses();
+ }
+
+ // A ReadOnceBag that we've already "peeked" at
+ private static class PeekedBag extends ReadOnceBag {
+ private static final long serialVersionUID = 1L;
+ NullableTuple head;
+ int index;
- if (pkgr.isDistinct()) {
- int count;
- for (count = 0; tupIter.hasNext(); ++count) {
- NullableTuple ntp = tupIter.next();
- tmp = (Tuple) ntp.getValueAsPigType();
- if (!tmp.equals(tOut))
- lineageTracer.union(tOut, tmp);
+ public PeekedBag(Packager pkgr, NullableTuple head,
+ Iterator<NullableTuple> tupIter,
+ PigNullableWritable keyWritable) {
+ super(pkgr, tupIter, keyWritable);
+ this.head = head;
+ this.index = head.getIndex();
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+ boolean headReturned = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!headReturned)
+ return true;
+
+ return tupIter.hasNext();
}
- if (count > 1) // only non-distinct tuples are inserted into the equivalence class
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- illustrator.addData((Tuple) tOut);
- return (Tuple) tOut;
- }
- boolean outInEqClass = true;
- try {
- for (int i = 1; i < numInputs + 1; i++) {
- DataBag dbs = (DataBag) ((Tuple) out).get(i);
- Iterator<Tuple> iter = dbs.iterator();
- if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
- outInEqClass = false;
- while (iter.hasNext()) {
- tmp = iter.next();
- // any of synthetic data in bags causes the output tuple to be synthetic
- if (!synthetic && ((ExampleTuple) tmp).synthetic)
- synthetic = true;
- lineageTracer.union(tOut, tmp);
+
+ @Override
+ public Tuple next() {
+ if (!headReturned) {
+ headReturned = true;
+ try {
+ return pkgr.getValueTuple(keyWritable, head,
+ head.getIndex());
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "PeekedBag failed to get value tuple : "
+ + e.toString());
+ }
+ }
+ NullableTuple ntup = tupIter.next();
+ Tuple ret = null;
+ try {
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "PeekedBag failed to get value tuple : "
+ + e.toString());
+ }
+ if (getReporter() != null) {
+ getReporter().progress();
}
+ return ret;
}
- } catch (ExecException e) {
- // TODO better exception handling
- throw new RuntimeException("Illustrator exception :"
- + e.getMessage());
- }
- if (outInEqClass)
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- tOut.synthetic = synthetic;
- illustrator.addData((Tuple) tOut);
- return tOut;
- } else
- return (Tuple) out;
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "PeekedBag does not support removal");
+ }
+ };
+ }
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sat Apr 12 19:06:25 2014
@@ -229,7 +229,7 @@ public class POPartialAgg extends Physic
}
}
avgTupleSize = estTotalMem / estTuples;
- int totalTuples = memLimits.getCacheLimit();
+ long totalTuples = memLimits.getCacheLimit();
LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Sat Apr 12 19:06:25 2014
@@ -2,20 +2,29 @@ package org.apache.pig.backend.hadoop.ex
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.Illustrable;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
-public class Packager implements Serializable, Cloneable {
+public class Packager implements Illustrable, Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -27,6 +36,8 @@ public class Packager implements Seriali
GROUP, JOIN
};
+ protected transient Illustrator illustrator = null;
+
// The key being worked on
Object key;
@@ -64,6 +75,11 @@ public class Packager implements Seriali
private PackageType pkgType;
+ boolean firstTime = true;
+ boolean useDefaultBag = false;
+
+ protected POPackage parent = null;
+
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -78,9 +94,20 @@ public class Packager implements Seriali
public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
+ checkBagType();
+
this.key = key;
this.bags = bags;
this.readOnce = readOnce;
+ // We assume that we need all bags materialized. Specialized subclasses
+ // may choose to handle this differently
+ for (int i = 0; i < bags.length; i++) {
+ if (readOnce[i]) {
+ DataBag materializedBag = getBag();
+ materializedBag.addAll(bags[i]);
+ bags[i] = materializedBag;
+ }
+ }
}
public Result getNext() throws ExecException {
@@ -112,19 +139,13 @@ public class Packager implements Seriali
}
}
- if (!readOnce[i]) {
- res.set(i + 1, bag);
- } else {
- DataBag readBag = mBagFactory.newDefaultBag();
- readBag.addAll(bag);
- res.set(i + 1, readBag);
- }
+ res.set(i + 1, bag);
}
}
- detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_OK;
- r.result = res;
+ r.result = illustratorMarkup(null, res, 0);
+ detachInput();
return r;
}
@@ -133,8 +154,36 @@ public class Packager implements Seriali
bags = null;
}
- public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
- throws ExecException {
+ protected Tuple illustratorMarkup2(Object in, Object out) {
+ if (illustrator != null) {
+ ExampleTuple tOut;
+ if (!(out instanceof ExampleTuple)) {
+ tOut = new ExampleTuple((Tuple) out);
+ } else {
+ tOut = (ExampleTuple) out;
+ }
+ illustrator.getLineage().insert(tOut);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ illustrator.getLineage().union(tOut, (Tuple) in);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ protected Tuple starMarkup(Tuple key, Tuple val, Tuple out){
+ if (illustrator != null){
+ Tuple copy = illustratorMarkup2(key, out);
+ // For distinct, we also need to retain lineage information from the values.
+ if (isDistinct())
+ copy = illustratorMarkup2(val, out);
+ return copy;
+ } else
+ return (Tuple) out;
+ }
+
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ Object key = getKey(keyWritable);
// Need to make a copy of the value, as hadoop uses the same ntup
// to represent each value.
Tuple val = (Tuple) ntup.getValueAsPigType();
@@ -177,19 +226,19 @@ public class Packager implements Seriali
}
}
}
- // copy = illustratorMarkup2(val, copy);
+ copy = illustratorMarkup2(val, copy);
} else if (isProjectStar) {
// the whole "value" is present in the "key"
copy = mTupleFactory.newTuple(((Tuple) key).getAll());
- // copy = illustratorMarkup2((Tuple) key, copy);
+ copy = starMarkup((Tuple) key, val, copy);
} else {
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
copy = mTupleFactory.newTuple(val.getAll());
- // copy = illustratorMarkup2(val, copy);
+ copy = illustratorMarkup2(val, copy);
}
return copy;
}
@@ -202,6 +251,27 @@ public class Packager implements Seriali
this.keyType = keyType;
}
+ /**
+ * @return the isKeyTuple
+ */
+ public boolean getKeyTuple() {
+ return isKeyTuple;
+ }
+
+ /**
+ * @return the keyAsTuple
+ */
+ public Tuple getKeyAsTuple() {
+ return isKeyTuple ? (Tuple) key : null;
+ }
+
+ /**
+ * @return the key
+ */
+ public Object getKey() {
+ return key;
+ }
+
public boolean[] getInner() {
return inner;
}
@@ -239,6 +309,15 @@ public class Packager implements Seriali
return keyInfo;
}
+ public Illustrator getIllustrator() {
+ return illustrator;
+ }
+
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ }
+
/**
* @return the distinct
*/
@@ -265,6 +344,10 @@ public class Packager implements Seriali
return pkgType;
}
+ public int getNumInputs(byte index) {
+ return numInputs;
+ }
+
public int getNumInputs() {
return numInputs;
}
@@ -279,7 +362,13 @@ public class Packager implements Seriali
clone.setNumInputs(numInputs);
clone.setPackageType(pkgType);
clone.setDistinct(distinct);
- clone.setInner(inner);
+ if (inner != null) {
+ clone.inner = new boolean[inner.length];
+ for (int i = 0; i < inner.length; i++) {
+ clone.inner[i] = inner[i];
+ }
+ } else
+ clone.inner = null;
if (keyInfo != null)
clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
keyInfo));
@@ -293,4 +382,97 @@ public class Packager implements Seriali
public String name() {
return this.getClass().getSimpleName();
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // All customized packagers are introduced during MRCompilaition.
+ // Illustrate happens before that, so we only have to focus on the basic
+ // POPackage
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ boolean synthetic = false;
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, parent);
+ }
+
+ if (isDistinct()) {
+ int count = 0;
+ for (Tuple tmp : bags[0]){
+ count++;
+ if (!tmp.equals(tOut))
+ lineageTracer.union(tOut, tmp);
+ }
+ if (count > 1) // only non-distinct tuples are inserted into the
+ // equivalence class
+ illustrator.getEquivalenceClasses().get(eqClassIndex)
+ .add(tOut);
+ illustrator.addData((Tuple) tOut);
+ return (Tuple) tOut;
+ }
+ boolean outInEqClass = true;
+ try {
+ for (int i = 1; i < numInputs + 1; i++) {
+ DataBag dbs = (DataBag) ((Tuple) out).get(i);
+ Iterator<Tuple> iter = dbs.iterator();
+ if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2
+ // records
+ outInEqClass = false;
+ while (iter.hasNext()) {
+ Tuple tmp = iter.next();
+ // any of synthetic data in bags causes the output tuple
+ // to be synthetic
+ if (!synthetic && ((ExampleTuple) tmp).synthetic)
+ synthetic = true;
+ lineageTracer.union(tOut, tmp);
+ }
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"
+ + e.getMessage());
+ }
+ if (outInEqClass)
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ public void setParent(POPackage pack) {
+ parent = pack;
+ }
+
+ public int numberOfEquivalenceClasses() {
+ return 1;
+ }
+
+ public void checkBagType() {
+ if(firstTime){
+ firstTime = false;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+ }
+
+ public DataBag getBag(){
+ return useDefaultBag ? mBagFactory.newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POJoinPackage in the pipeline and is also blocking the pipeline;
+ // constructor argument should be 2 * numInputs. But for one obscure
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs-1);
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Sat Apr 12 19:06:25 2014
@@ -145,6 +145,7 @@ public class POShuffleTezLoad extends PO
}
key = pkgr.getKey(min);
+ keyWritable = min;
DataBag[] bags = new DataBag[numInputs];
POPackageTupleBuffer buffer = new POPackageTupleBuffer();
@@ -190,7 +191,7 @@ public class POShuffleTezLoad extends PO
for (Object val : vals) {
NullableTuple nTup = (NullableTuple) val;
int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(key, nTup, index);
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
bag.add(tup);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Sat Apr 12 19:06:25 2014
@@ -47,12 +47,12 @@ public class AccumulatorOptimizerUtil {
}
// if POPackage is for distinct, just return
- if (((POPackage) po_package).getPkgr().isDistinct()) {
+ if (pkgr.isDistinct()) {
return;
}
// if any input to POPackage is inner, just return
- boolean[] isInner = ((POPackage) po_package).getPkgr().getInner();
+ boolean[] isInner = pkgr.getInner();
for (boolean b: isInner) {
if (b) {
return;
Modified: pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/ReadOnceBag.java Sat Apr 12 19:06:25 2014
@@ -25,28 +25,25 @@ import java.util.Iterator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
/**
- * This bag is specifically created for use by POPackageLite. So it has three
- * properties, the NullableTuple iterator, the key (Object) and the keyInfo
- * (Map<Integer, Pair<Boolean, Map<Integer, Integer>>>) all three
- * of which are required in the constructor call. This bag does not store
- * the tuples in memory, but has access to an iterator typically provided by
- * Hadoop. Use this when you already have an iterator over tuples and do not
- * want to copy over again to a new bag.
+ * This bag does not store the tuples in memory, but has access to an iterator
+ * typically provided by Hadoop. Use this when you already have an iterator over
+ * tuples and do not want to copy over again to a new bag.
*/
public class ReadOnceBag implements DataBag {
// The Packager that created this
- LitePackager pkgr;
+ protected Packager pkgr;
//The iterator of Tuples. Marked transient because we will never serialize this.
- transient Iterator<NullableTuple> tupIter;
+ protected transient Iterator<NullableTuple> tupIter;
// The key being worked on
- Object key;
+ protected PigNullableWritable keyWritable;
/**
*
@@ -61,11 +58,11 @@ public class ReadOnceBag implements Data
* @param tupIter Iterator<NullableTuple>
* @param key Object
*/
- public ReadOnceBag(LitePackager pkgr, Iterator<NullableTuple> tupIter,
- Object key) {
+ public ReadOnceBag(Packager pkgr, Iterator<NullableTuple> tupIter,
+ PigNullableWritable keyWritable) {
this.pkgr = pkgr;
this.tupIter = tupIter;
- this.key = key;
+ this.keyWritable = keyWritable;
}
/* (non-Javadoc)
@@ -217,7 +214,7 @@ public class ReadOnceBag implements Data
return hash;
}
- class ReadOnceBagIterator implements Iterator<Tuple>
+ protected class ReadOnceBagIterator implements Iterator<Tuple>
{
/* (non-Javadoc)
* @see java.util.Iterator#hasNext()
@@ -236,7 +233,7 @@ public class ReadOnceBag implements Data
int index = ntup.getIndex();
Tuple ret = null;
try {
- ret = pkgr.getValueTuple(key, ntup, index);
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
} catch (ExecException e)
{
throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
Modified: pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java Sat Apr 12 19:06:25 2014
@@ -53,10 +53,10 @@ public abstract class SelfSpillBag exten
public static class MemoryLimits {
private long maxMemUsage;
- private int cacheLimit = Integer.MAX_VALUE;
+ private long cacheLimit = Integer.MAX_VALUE;
private long memUsage = 0;
private long numObjsSizeChecked = 0;
-
+
private static float cachedMemUsage = 0.2F;
private static long maxMem = 0;
static {
@@ -99,11 +99,11 @@ public abstract class SelfSpillBag exten
*
* @return number of objects limit
*/
- public int getCacheLimit() {
+ public long getCacheLimit() {
if (numObjsSizeChecked > 0) {
long avgUsage = memUsage / numObjsSizeChecked;
if (avgUsage > 0) {
- cacheLimit = (int) (maxMemUsage / avgUsage);
+ cacheLimit = maxMemUsage / avgUsage;
}
}
return cacheLimit;
Modified: pig/branches/tez/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigContext.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigContext.java Sat Apr 12 19:06:25 2014
@@ -112,7 +112,7 @@ public class PigContext implements Seria
transient private Map<URL, String> extraJarOriginalPaths = new HashMap<URL, String>();
// jars needed for scripting udfs - jython.jar etc
- public List<String> scriptJars = new ArrayList<String>(2);
+ transient public List<String> scriptJars = new ArrayList<String>(2);
// jars that should not be merged in.
// (some functions may come from pig.jar and we don't want the whole jar file.)
@@ -120,7 +120,7 @@ public class PigContext implements Seria
// jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
transient public Vector<String> predeployedJars = new Vector<String>(2);
-
+
// script files that are needed to run a job
@Deprecated
public List<String> scriptFiles = new ArrayList<String>();
@@ -259,9 +259,9 @@ public class PigContext implements Seria
String pigJar = JarManager.findContainingJar(Main.class);
String hadoopJar = JarManager.findContainingJar(FileSystem.class);
if (pigJar != null) {
- skipJars.add(pigJar);
+ addSkipJar(pigJar);
if (!pigJar.equals(hadoopJar))
- skipJars.add(hadoopJar);
+ addSkipJar(hadoopJar);
}
this.executionEngine = execType.getExecutionEngine(this);
@@ -320,18 +320,7 @@ public class PigContext implements Seria
* @param path
*/
public void addScriptFile(String path) {
- if (path != null) {
- aliasedScriptFiles.put(path.replaceFirst("^/", "").replaceAll(":", ""), new File(path));
- }
- }
-
- public boolean hasJar(String path) {
- for (URL url : extraJars) {
- if (extraJarOriginalPaths.get(url).equals(path)) {
- return true;
- }
- }
- return false;
+ addScriptFile(path, path);
}
/**
@@ -346,6 +335,18 @@ public class PigContext implements Seria
}
}
+ public void addScriptJar(String path) {
+ if (path != null && !scriptJars.contains(path)) {
+ scriptJars.add(path);
+ }
+ }
+
+ public void addSkipJar(String path) {
+ if (path != null && !skipJars.contains(path)) {
+ skipJars.add(path);
+ }
+ }
+
public void addJar(String path) throws MalformedURLException {
if (path != null) {
URL resource = (new File(path)).toURI().toURL();
@@ -354,14 +355,23 @@ public class PigContext implements Seria
}
public void addJar(URL resource, String originalPath) throws MalformedURLException{
- if (resource != null) {
+ if (resource != null && !extraJars.contains(resource)) {
extraJars.add(resource);
extraJarOriginalPaths.put(resource, originalPath);
classloader.addURL(resource);
Thread.currentThread().setContextClassLoader(PigContext.classloader);
}
}
-
+
+ public boolean hasJar(String path) {
+ for (URL url : extraJars) {
+ if (extraJarOriginalPaths.get(url).equals(path)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Adds the specified path to the predeployed jars list. These jars will
* never be included in generated job jar.
@@ -370,7 +380,9 @@ public class PigContext implements Seria
* cluster to reduce the size of the job jar.
*/
public void markJarAsPredeployed(String path) {
- predeployedJars.add(path);
+ if (path != null && !predeployedJars.contains(path)) {
+ predeployedJars.add(path);
+ }
}
public String doParamSubstitution(InputStream in,
Modified: pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1586885&r1=1586884&r2=1586885&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/branches/tez/src/org/apache/pig/pen/IllustratorAttacher.java Sat Apr 12 19:06:25 2014
@@ -196,10 +196,7 @@ public class IllustratorAttacher extends
@Override
public void visitPackage(POPackage pkg) throws VisitorException{
- if (!(pkg.getPkgr() instanceof LitePackager) && pkg.getPkgr().isDistinct())
- setIllustrator(pkg, 1);
- else
- setIllustrator(pkg, null);
+ setIllustrator(pkg, pkg.numberOfEquivalenceClasses());
}
@Override
@@ -210,17 +207,15 @@ public class IllustratorAttacher extends
for (PhysicalPlan innerPlan : innerPlans)
innerPlanAttach(nfe, innerPlan);
List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
- if (preds != null && preds.size() == 1 && preds.get(0) instanceof POPackage) {
- POPackage pkg = (POPackage) preds.get(0);
- if (!(pkg.getPkgr() instanceof LitePackager) && pkg.getPkgr().isDistinct()) {
- // equivalence class of POPackage for DISTINCT needs to be used
- // instead of the succeeding POForEach's equivalence class
- setIllustrator(nfe, pkg.getIllustrator().getEquivalenceClasses());
- nfe.getIllustrator().setEqClassesShared();
- }
- } else {
+ if (preds != null && preds.size() == 1
+ && preds.get(0) instanceof POPackage
+ && ((POPackage) preds.get(0)).getPkgr().isDistinct()) {
+ // equivalence class of POPackage for DISTINCT needs to be used
+ //instead of the succeeding POForEach's equivalence class
+ setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
+ nfe.getIllustrator().setEqClassesShared();
+ } else
setIllustrator(nfe, 1);
- }
}
@Override