You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [4/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/sr...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Thu Nov 27 12:49:54 2014
@@ -38,7 +38,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -57,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.builtin.SampleLoader;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -80,7 +80,7 @@ public class FetchOptimizer {
*/
public static boolean isFetchEnabled(PigContext pc) {
return "true".equalsIgnoreCase(
- pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true"));
+ pc.getProperties().getProperty(PigConfiguration.PIG_OPT_FETCH, "true"));
}
/**
@@ -97,14 +97,20 @@ public class FetchOptimizer {
FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp);
fpv.visit();
// Plan is fetchable only if FetchablePlanVisitor returns true AND
- // limit is present in the plan. Limit is a safeguard. If the input
- // is large, and there is no limit, fetch optimizer will fetch the
- // entire input to the client. That can be dangerous.
- boolean isFetchable = fpv.isPlanFetchable() &&
- PlanHelper.containsPhysicalOperator(pp, POLimit.class);
- if (isFetchable)
- init(pp);
- return isFetchable;
+ // limit is present in the plan, i.e: limit is pushed up to the loader.
+ // Limit is a safeguard. If the input is large, and there is no limit,
+ // fetch optimizer will fetch the entire input to the client. That can be dangerous.
+ if (!fpv.isPlanFetchable()) {
+ return false;
+ }
+ for (POLoad load : PlanHelper.getPhysicalOperators(pp, POLoad.class)) {
+ if (load.getLimit() == -1) {
+ return false;
+ }
+ }
+ pc.getProperties().setProperty(PigImplConstants.CONVERTED_TO_FETCH, "true");
+ init(pp);
+ return true;
}
return false;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Thu Nov 27 12:49:54 2014
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,8 +28,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.util.UriUtil;
-import java.io.IOException;
-
/**
* Class that computes the size of output for file-based systems.
*/
@@ -43,19 +43,23 @@ public class FileBasedOutputSizeReader i
*/
@Override
public boolean supports(POStore sto, Configuration conf) {
- String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
- // Some store functions do not support file-based output reader (e.g.
- // HCatStorer), so they should be excluded.
- String unsupported = conf.get(
- PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
- if (unsupported != null) {
- for (String s : unsupported.split(",")) {
- if (s.equalsIgnoreCase(storeFuncName)) {
- return false;
+ boolean nullOrSupportedScheme = UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+ if (nullOrSupportedScheme) {
+ // Some store functions that do not have scheme
+ // do not support file-based output reader (e.g.HCatStorer),
+ // so they should be excluded.
+ String unsupported = conf.get(
+ PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
+ if (unsupported != null) {
+ String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
+ for (String s : unsupported.split(",")) {
+ if (s.equalsIgnoreCase(storeFuncName)) {
+ return false;
+ }
}
}
}
- return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+ return nullOrSupportedScheme;
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Thu Nov 27 12:49:54 2014
@@ -92,12 +92,27 @@ public class InputSizeReducerEstimator i
return reducers;
}
+ static long getTotalInputFileSize(Configuration conf,
+ List<POLoad> lds, Job job) throws IOException {
+ return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
+ }
+
/**
* Get the input size for as many inputs as possible. Inputs that do not report
* their size nor can pig look that up itself are excluded from this size.
+ *
+ * @param conf Configuration
+ * @param lds List of POLoads
+ * @param job Job
+ * @param max Maximum value of total input size that will trigger exit. Many
+ * times we're only interested whether the total input size is greater than
+ * X or not. In such case, we can exit the function early as soon as the max
+ * is reached.
+ * @return
+ * @throws IOException
*/
static long getTotalInputFileSize(Configuration conf,
- List<POLoad> lds, Job job) throws IOException {
+ List<POLoad> lds, Job job, long max) throws IOException {
long totalInputFileSize = 0;
for (POLoad ld : lds) {
long size = getInputSizeFromLoader(ld, job);
@@ -115,8 +130,14 @@ public class InputSizeReducerEstimator i
FileStatus[] status = fs.globStatus(path);
if (status != null) {
for (FileStatus s : status) {
- totalInputFileSize += MapRedUtil.getPathLength(fs, s);
+ totalInputFileSize += MapRedUtil.getPathLength(fs, s, max);
+ if (totalInputFileSize > max) {
+ break;
+ }
}
+ } else {
+ // If file is not found, we should report -1
+ return -1;
}
} else {
// If we cannot estimate size of a location, we should report -1
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Nov 27 12:49:54 2014
@@ -17,9 +17,10 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR;
+import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY;
+
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -77,9 +78,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -163,9 +164,6 @@ public class JobControlCompiler{
public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
- private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
- private static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg";
-
public static final String PIG_MAP_COUNTER = "pig.counters.counter_";
public static final String PIG_MAP_RANK_NAME = "pig.rank_";
public static final String PIG_MAP_SEPARATOR = "_";
@@ -447,8 +445,8 @@ public class JobControlCompiler{
return false;
}
- long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job);
long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
+ long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job, inputByteMax);
log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
return false;
@@ -505,7 +503,7 @@ public class JobControlCompiler{
Path tmpLocation = null;
// add settings for pig statistics
- String setScriptProp = conf.get(PigConfiguration.INSERT_ENABLED, "true");
+ String setScriptProp = conf.get(PigConfiguration.PIG_SCRIPT_INFO_ENABLED, "true");
if (setScriptProp.equalsIgnoreCase("true")) {
MRScriptState ss = MRScriptState.get();
ss.addSettingsToConf(mro, conf);
@@ -546,42 +544,6 @@ public class JobControlCompiler{
nwJob.setNumReduceTasks(0);
}
- for (String udf : mro.UDFs) {
- if (udf.contains("GFCross")) {
- Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
- if (func instanceof GFCross) {
- String crossKey = ((GFCross)func).getCrossKey();
- // If non GFCross has been processed yet
- if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
- pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- Integer.toString(nwJob.getNumReduceTasks()));
- }
- conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
- }
- }
- }
-
- if(lds!=null && lds.size()>0){
- for (POLoad ld : lds) {
- //Store the target operators for tuples read
- //from this input
- List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
- List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
- if(ldSucs!=null){
- for (PhysicalOperator operator2 : ldSucs) {
- ldSucKeys.add(operator2.getOperatorKey());
- }
- }
- inpTargets.add(ldSucKeys);
- inpSignatureLists.add(ld.getSignature());
- inpLimits.add(ld.getLimit());
- //Remove the POLoad from the plan
- if (!pigContext.inIllustrator)
- mro.mapPlan.remove(ld);
- }
- }
-
if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
{
if (okToRunLocal(nwJob, mro, lds)) {
@@ -610,6 +572,22 @@ public class JobControlCompiler{
conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
} else {
log.info(BIG_JOB_LOG_MSG);
+ // Search to see if we have any UDF/LoadFunc/StoreFunc that need to pack things into the
+ // distributed cache.
+ List<String> cacheFiles = new ArrayList<String>();
+ List<String> shipFiles = new ArrayList<String>();
+ UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+ mapUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+ UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+ reduceUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+ setupDistributedCache(pigContext, conf, cacheFiles.toArray(new String[]{}), false);
+
// Setup the DistributedCache for this job
List<URL> allJars = new ArrayList<URL>();
@@ -619,6 +597,19 @@ public class JobControlCompiler{
}
}
+ for (String udf : mro.UDFs) {
+ Class clazz = pigContext.getClassForAlias(udf);
+ if (clazz != null) {
+ String jar = JarManager.findContainingJar(clazz);
+ if (jar!=null) {
+ URL jarURL = new File(jar).toURI().toURL();
+ if (!allJars.contains(jarURL)) {
+ allJars.add(jarURL);
+ }
+ }
+ }
+ }
+
for (String scriptJar : pigContext.scriptJars) {
URL jar = new File(scriptJar).toURI().toURL();
if (!allJars.contains(jar)) {
@@ -626,6 +617,13 @@ public class JobControlCompiler{
}
}
+ for (String shipFile : shipFiles) {
+ URL jar = new File(shipFile).toURI().toURL();
+ if (!allJars.contains(jar)) {
+ allJars.add(jar);
+ }
+ }
+
for (String defaultJar : JarManager.getDefaultJars()) {
URL jar = new File(defaultJar).toURI().toURL();
if (!allJars.contains(jar)) {
@@ -641,7 +639,6 @@ public class JobControlCompiler{
}
}
if (!predeployed) {
- log.info("Adding jar to DistributedCache: " + jar);
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
}
}
@@ -653,6 +650,37 @@ public class JobControlCompiler{
}
}
+ for (String udf : mro.UDFs) {
+ if (udf.contains("GFCross")) {
+ Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+ if (func instanceof GFCross) {
+ String crossKey = ((GFCross)func).getCrossKey();
+ conf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey,
+ Integer.toString(mro.getRequestedParallelism()));
+ }
+ }
+ }
+
+ if(lds!=null && lds.size()>0){
+ for (POLoad ld : lds) {
+ //Store the target operators for tuples read
+ //from this input
+ List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+ List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+ if(ldSucs!=null){
+ for (PhysicalOperator operator2 : ldSucs) {
+ ldSucKeys.add(operator2.getOperatorKey());
+ }
+ }
+ inpTargets.add(ldSucKeys);
+ inpSignatureLists.add(ld.getSignature());
+ inpLimits.add(ld.getLimit());
+ //Remove the POLoad from the plan
+ if (!pigContext.inIllustrator)
+ mro.mapPlan.remove(ld);
+ }
+ }
+
if(Utils.isLocal(pigContext, conf)) {
ConfigurationUtil.replaceConfigForLocalMode(conf);
}
@@ -779,10 +807,6 @@ public class JobControlCompiler{
// serialized
setupDistributedCacheForJoin(mro, pigContext, conf);
- // Search to see if we have any UDFs that need to pack things into the
- // distributed cache.
- setupDistributedCacheForUdfs(mro, pigContext, conf);
-
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
POPackage pack = null;
@@ -1022,9 +1046,9 @@ public class JobControlCompiler{
Configuration conf = nwJob.getConfiguration();
// set various parallelism into the job conf for later analysis, PIG-2779
- conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
- conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
- conf.setInt("pig.info.reducers.estimated.parallel", mro.estimatedParallelism);
+ conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
+ conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
+ conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);
// this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
mro.requestedParallelism = jobParallelism;
@@ -1080,10 +1104,10 @@ public class JobControlCompiler{
MapReduceOper mapReducerOper) throws IOException {
Configuration conf = job.getConfiguration();
- PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
+ PigReducerEstimator estimator = conf.get(PIG_EXEC_REDUCER_ESTIMATOR) == null ?
new InputSizeReducerEstimator() :
PigContext.instantiateObjectFromParams(conf,
- REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+ PIG_EXEC_REDUCER_ESTIMATOR, PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, PigReducerEstimator.class);
log.info("Using reducer estimator: " + estimator.getClass().getName());
int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
@@ -1478,13 +1502,6 @@ public class JobControlCompiler{
.visit();
}
- private void setupDistributedCacheForUdfs(MapReduceOper mro,
- PigContext pigContext,
- Configuration conf) throws IOException {
- new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
- new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
- }
-
private static void setupDistributedCache(PigContext pigContext,
Configuration conf,
Properties properties, String key,
@@ -1633,11 +1650,50 @@ public class JobControlCompiler{
// Turn on the symlink feature
DistributedCache.createSymlink(conf);
- // REGISTER always copies locally the jar file. see PigServer.registerJar()
- Path pathInHDFS = shipToHDFS(pigContext, conf, url);
- // and add to the DistributedCache
- DistributedCache.addFileToClassPath(pathInHDFS, conf);
- pigContext.addSkipJar(url.getPath());
+ Path distCachePath = getExistingDistCacheFilePath(conf, url);
+ if (distCachePath != null) {
+ log.info("Jar file " + url + " already in DistributedCache as "
+ + distCachePath + ". Not copying to hdfs and adding again");
+ // Path already in dist cache
+ if (!HadoopShims.isHadoopYARN()) {
+ // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+ // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+ // But path may only be in 'mapred.cache.files' and not be in
+ // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+ DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+ }
+ }
+ else {
+ // REGISTER always copies locally the jar file. see PigServer.registerJar()
+ Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+ DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+ log.info("Added jar " + url + " to DistributedCache through " + pathInHDFS);
+ }
+
+ }
+
+ private static Path getExistingDistCacheFilePath(Configuration conf, URL url) throws IOException {
+ URI[] cacheFileUris = DistributedCache.getCacheFiles(conf);
+ if (cacheFileUris != null) {
+ String fileName = url.getRef() == null ? FilenameUtils.getName(url.getPath()) : url.getRef();
+ for (URI cacheFileUri : cacheFileUris) {
+ Path path = new Path(cacheFileUri);
+ String cacheFileName = cacheFileUri.getFragment() == null ? path.getName() : cacheFileUri.getFragment();
+ // Match
+ // - if both filenames are same and no symlinks (or)
+ // - if both symlinks are same (or)
+ // - symlink of existing cache file is same as the name of the new file to be added.
+ // That would be the case when hbase-0.98.4.jar#hbase.jar is configured via Oozie
+ // and register hbase.jar is done in the pig script.
+ // If two different files are symlinked to the same name, then there is a conflict
+ // and hadoop itself does not guarantee which file will be symlinked to that name.
+ // So we are good.
+ if (fileName.equals(cacheFileName)) {
+ return path;
+ }
+ }
+ }
+ return null;
}
private static Path getCacheStagingDir(Configuration conf) throws IOException {
@@ -1763,6 +1819,8 @@ public class JobControlCompiler{
ArrayList<String> replicatedPath = new ArrayList<String>();
FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+ long maxSize = Long.valueOf(pigContext.getProperties().getProperty(
+ PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000"));
// the first input is not replicated
long sizeOfReplicatedInputs = 0;
@@ -1782,7 +1840,7 @@ public class JobControlCompiler{
Path path = new Path(replFiles[i].getFileName());
FileSystem fs = path.getFileSystem(conf);
sizeOfReplicatedInputs +=
- MapRedUtil.getPathLength(fs, fs.getFileStatus(path));
+ MapRedUtil.getPathLength(fs, fs.getFileStatus(path), maxSize);
}
newReplFiles[i] = new FileSpec(symlink,
(replFiles[i] == null ? null : replFiles[i].getFuncSpec()));
@@ -1790,9 +1848,7 @@ public class JobControlCompiler{
join.setReplFiles(newReplFiles);
- String maxSize = pigContext.getProperties().getProperty(
- PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000");
- if (sizeOfReplicatedInputs > Long.parseLong(maxSize)){
+ if (sizeOfReplicatedInputs > maxSize) {
throw new VisitorException("Replicated input files size: "
+ sizeOfReplicatedInputs + " exceeds " +
PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES + ": " + maxSize);
@@ -1854,41 +1910,6 @@ public class JobControlCompiler{
}
}
- private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-
- private PigContext pigContext = null;
- private Configuration conf = null;
-
- public UdfDistributedCacheVisitor(PhysicalPlan plan,
- PigContext pigContext,
- Configuration conf) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.pigContext = pigContext;
- this.conf = conf;
- }
-
- @Override
- public void visitUserFunc(POUserFunc func) throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (Utils.isLocal(pigContext, conf)) return;
-
- // set up distributed cache for files indicated by the UDF
- String[] files = func.getCacheFiles();
- if (files == null) return;
-
- try {
- setupDistributedCache(pigContext, conf, files, false);
- } catch (IOException e) {
- String msg = "Internal error. Distributed cache could not " +
- "be set up for the requested files";
- throw new VisitorException(msg, e);
- }
- }
- }
-
private static class ParallelConstantVisitor extends PhyPlanVisitor {
private int rp;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 27 12:49:54 2014
@@ -1076,7 +1076,14 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
- nonBlocking(op);
+ if (op.isMapSideOnly() && curMROp.isMapDone()) {
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+ curMROp = startNew(fSpec, prevMROper);
+ curMROp.mapPlan.addAsLeaf(op);
+ } else {
+ nonBlocking(op);
+ }
List<PhysicalPlan> plans = op.getInputPlans();
if(plans!=null)
for (PhysicalPlan plan : plans) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -637,10 +638,10 @@ public class MapReduceLauncher extends L
pc.getProperties().getProperty(
"last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
- String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
+ String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_COMBINER);
if (!pc.inIllustrator && !("true".equals(prop))) {
boolean doMapAgg =
- Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,"false"));
+ Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,"false"));
CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
co.visit();
//display the warning message(s) from the CombinerOptimizer
@@ -686,7 +687,7 @@ public class MapReduceLauncher extends L
fRem.visit();
boolean isMultiQuery =
- Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+ Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
if (isMultiQuery) {
// reduces the number of MROpers in the MR plan generated
@@ -797,13 +798,13 @@ public class MapReduceLauncher extends L
throw new ExecException(backendException);
}
try {
- TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -28,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -523,23 +525,32 @@ public class MapReduceOper extends Opera
}
private POCounter getCounterOperation() {
- PhysicalOperator operator;
- Iterator<PhysicalOperator> it = this.mapPlan.getLeaves().iterator();
-
- while(it.hasNext()) {
- operator = it.next();
- if(operator instanceof POCounter)
- return (POCounter) operator;
+ POCounter counter = getCounterOperation(this.mapPlan);
+ if (counter == null) {
+ counter = getCounterOperation(this.reducePlan);
}
+ return counter;
+ }
- it = this.reducePlan.getLeaves().iterator();
+ private POCounter getCounterOperation(PhysicalPlan plan) {
+ PhysicalOperator operator;
+ Iterator<PhysicalOperator> it = plan.getLeaves().iterator();
- while(it.hasNext()) {
+ while (it.hasNext()) {
operator = it.next();
- if(operator instanceof POCounter)
+ if (operator instanceof POCounter) {
return (POCounter) operator;
+ } else if (operator instanceof POStore) {
+ List<PhysicalOperator> preds = plan.getPredecessors(operator);
+ if (preds != null) {
+ for (PhysicalOperator pred : preds) {
+ if (pred instanceof POCounter) {
+ return (POCounter) pred;
+ }
+ }
+ }
+ }
}
-
return null;
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Nov 27 12:49:54 2014
@@ -121,6 +121,11 @@ class MultiQueryOptimizer extends MROpPl
+ " uses customPartitioner, do not merge it");
continue;
}
+ if (successor.isCounterOperation()) {
+ log.debug("Splittee " + successor.getOperatorKey().getId()
+ + " has POCounter, do not merge it");
+ continue;
+ }
if (isMapOnly(successor)) {
if (isSingleLoadMapperPlan(successor.mapPlan)
&& isSinglePredecessor(successor)) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Nov 27 12:49:54 2014
@@ -349,6 +349,7 @@ public class PhyPlanSetter extends PhyPl
@Override
public void visitPreCombinerLocalRearrange(
POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
+ super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange);
preCombinerLocalRearrange.setParentPlan(parent);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Nov 27 12:49:54 2014
@@ -122,8 +122,10 @@ public class PigBytesRawComparator exten
if( dataByteArraysCompare ) {
rc = WritableComparator.compareBytes(b1, offset1, length1, b2, offset2, length2);
} else {
- // Subtract 2, one for null byte and one for index byte
- rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+ // Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign
+ // of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already
+ // takes that into account.
+ return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
}
} else {
// For sorting purposes two nulls are equal.
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Nov 27 12:49:54 2014
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -73,6 +75,15 @@ public class PigCombiner {
PigContext pigContext = null;
private volatile boolean initialized = false;
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ firstTime = true;
+ }
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Thu Nov 27 12:49:54 2014
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.joda.time.DateTimeZone;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +30,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -54,6 +53,7 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -162,6 +162,7 @@ public abstract class PigGenericMapBase
Configuration job = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+ context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
PigMapReduce.sJobConf = context.getConfiguration();
@@ -214,11 +215,7 @@ public abstract class PigGenericMapBase
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
- String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
- if (dtzStr != null && dtzStr.length() > 0) {
- // ensure that the internal timezone is uniformly in UTC offset style
- DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
- }
+ Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get());
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Nov 27 12:49:54 2014
@@ -30,7 +30,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -57,8 +60,8 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
-import org.joda.time.DateTimeZone;
/**
* This class is the static Mapper & Reducer classes that
@@ -100,6 +103,17 @@ public class PigGenericMapReduce {
public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ sJobContext = null;
+ sJobConf = null;
+ sJobConfInternal = new ThreadLocal<Configuration>();
+ }
+
public static class Map extends PigMapBase {
@Override
@@ -306,6 +320,7 @@ public class PigGenericMapReduce {
pack = getPack(context);
Configuration jConf = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+ context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
sJobContext = context;
sJobConfInternal.set(context.getConfiguration());
sJobConf = context.getConfiguration();
@@ -347,11 +362,7 @@ public class PigGenericMapReduce {
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
- String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
- if (dtzStr != null && dtzStr.length() > 0) {
- // ensure that the internal timezone is uniformly in UTC offset style
- DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
- }
+ Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get());
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Nov 27 12:49:54 2014
@@ -56,17 +56,11 @@ public final class PigHadoopLogger imple
return logger;
}
- public void destroy() {
- if (reporter != null) {
- reporter.destroy();
- }
- reporter = null;
- }
-
public void setReporter(PigStatusReporter reporter) {
this.reporter = reporter;
}
+ @Override
@SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
String className = o.getClass().getName();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Nov 27 12:49:54 2014
@@ -93,6 +93,7 @@ public class PigInputFormat extends Inpu
Configuration conf = context.getConfiguration();
PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
.deserialize(conf.get("udf.import.list")));
+ MapRedUtil.setupUDFContext(conf);
LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Nov 27 12:49:54 2014
@@ -60,6 +60,13 @@ public class PigMapReduceCounter {
pOperator = mp.getPredecessors(pOperator).get(0);
}
}
+
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(
+ JobControlCompiler.PIG_MAP_RANK_NAME
+ + context.getJobID().toString(), taskID, 0);
+ }
}
/**
@@ -69,15 +76,11 @@ public class PigMapReduceCounter {
public void collect(Context context, Tuple tuple)
throws InterruptedException, IOException {
context.write(null, tuple);
- try {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null) {
- reporter.incrCounter(
- JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID, 1);
- }
- } catch (Exception ex) {
- log.error("Error on incrementer of PigMapCounter");
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(
+ JobControlCompiler.PIG_MAP_RANK_NAME
+ + context.getJobID().toString(), taskID, 1);
}
}
}
@@ -116,6 +119,7 @@ public class PigMapReduceCounter {
}
this.context = context;
+ incrementCounter(0L);
}
/**
@@ -127,21 +131,14 @@ public class PigMapReduceCounter {
* @param increment is the value to add to the corresponding global counter.
**/
public static void incrementCounter(Long increment) {
- try {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null) {
-
- if(leaf instanceof POCounter){
- reporter.incrCounter(
- JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID, increment);
- }
-
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ if(leaf instanceof POCounter){
+ reporter.incrCounter(
+ JobControlCompiler.PIG_MAP_RANK_NAME
+ + context.getJobID().toString(), taskID, increment);
}
- } catch (Exception ex) {
- log.error("Error on incrementer of PigReduceCounter");
}
-
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Thu Nov 27 12:49:54 2014
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
@@ -37,7 +36,6 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -50,19 +48,19 @@ import org.apache.pig.impl.util.ObjectSe
*/
@SuppressWarnings("unchecked")
public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
-
+
private enum Mode { SINGLE_STORE, MULTI_STORE};
-
+
/** the temporary directory for the multi store */
public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
/** the relative path that can be used to build a temporary
* place to store the output from a number of map-reduce tasks*/
public static final String PIG_TMP_PATH = "pig.tmp.path";
-
- List<POStore> reduceStores = null;
- List<POStore> mapStores = null;
- Configuration currentConf = null;
-
+
+ protected List<POStore> reduceStores = null;
+ protected List<POStore> mapStores = null;
+ protected Configuration currentConf = null;
+
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
@@ -97,27 +95,27 @@ public class PigOutputFormat extends Out
@SuppressWarnings("unchecked")
static public class PigRecordWriter
extends RecordWriter<WritableComparable, Tuple> {
-
+
/**
* the actual RecordWriter
*/
private RecordWriter wrappedWriter;
-
+
/**
* the StoreFunc for the single store
*/
private StoreFuncInterface sFunc;
-
+
/**
* Single Query or multi query
*/
private Mode mode;
-
- public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
+
+ public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
Mode mode)
- throws IOException {
+ throws IOException {
this.mode = mode;
-
+
if(mode == Mode.SINGLE_STORE) {
this.wrappedWriter = wrappedWriter;
this.sFunc = sFunc;
@@ -128,7 +126,7 @@ public class PigOutputFormat extends Out
/**
* We only care about the values, so we are going to skip the keys when
* we write.
- *
+ *
* @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
*/
@Override
@@ -142,7 +140,7 @@ public class PigOutputFormat extends Out
}
@Override
- public void close(TaskAttemptContext taskattemptcontext) throws
+ public void close(TaskAttemptContext taskattemptcontext) throws
IOException, InterruptedException {
if(mode == Mode.SINGLE_STORE) {
wrappedWriter.close(taskattemptcontext);
@@ -150,24 +148,24 @@ public class PigOutputFormat extends Out
}
}
-
+
/**
* Before delegating calls to underlying OutputFormat or OutputCommitter
* Pig needs to ensure the Configuration in the JobContext contains
- * the output location and StoreFunc
- * for the specific store - so set these up in the context for this specific
+ * the output location and StoreFunc
+ * for the specific store - so set these up in the context for this specific
* store
* @param jobContext the {@link JobContext}
* @param store the POStore
* @throws IOException on failure
*/
- public static void setLocation(JobContext jobContext, POStore store) throws
+ public static void setLocation(JobContext jobContext, POStore store) throws
IOException {
Job storeJob = new Job(jobContext.getConfiguration());
StoreFuncInterface storeFunc = store.getStoreFunc();
String outputLocation = store.getSFile().getFileName();
storeFunc.setStoreLocation(outputLocation, storeJob);
-
+
// the setStoreLocation() method would indicate to the StoreFunc
// to set the output location for its underlying OutputFormat.
// Typically OutputFormat's store the output location in the
@@ -176,7 +174,7 @@ public class PigOutputFormat extends Out
// OutputFormat might have set) and merge it with the Configuration
// we started with so that when this method returns the Configuration
// supplied as input has the updates.
- ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
+ ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
storeJob.getConfiguration());
}
@@ -187,20 +185,20 @@ public class PigOutputFormat extends Out
checkOutputSpecsHelper(reduceStores, jobcontext);
}
- private void checkOutputSpecsHelper(List<POStore> stores, JobContext
+ private void checkOutputSpecsHelper(List<POStore> stores, JobContext
jobcontext) throws IOException, InterruptedException {
for (POStore store : stores) {
// make a copy of the original JobContext so that
- // each OutputFormat get a different copy
+ // each OutputFormat get a different copy
JobContext jobContextCopy = HadoopShims.createJobContext(
jobcontext.getConfiguration(), jobcontext.getJobID());
-
+
// set output location
PigOutputFormat.setLocation(jobContextCopy, store);
-
+
StoreFuncInterface sFunc = store.getStoreFunc();
OutputFormat of = sFunc.getOutputFormat();
-
+
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
try {
@@ -224,23 +222,22 @@ public class PigOutputFormat extends Out
* @param currentConf2
* @param storeLookupKey
* @return
- * @throws IOException
+ * @throws IOException
*/
- private List<POStore> getStores(Configuration conf, String storeLookupKey)
+ private List<POStore> getStores(Configuration conf, String storeLookupKey)
throws IOException {
return (List<POStore>)ObjectSerializer.deserialize(
conf.get(storeLookupKey));
}
-
-
- private void setupUdfEnvAndStores(JobContext jobcontext)
+
+ protected void setupUdfEnvAndStores(JobContext jobcontext)
throws IOException{
Configuration newConf = jobcontext.getConfiguration();
-
- // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
+
+ // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
// construct of PigOutputCommitter, can make use of it
MapRedUtil.setupUDFContext(newConf);
-
+
// if there is a udf in the plan we would need to know the import
// path so we can instantiate the udf. This is required because
// we will be deserializing the POStores out of the plan in the next
@@ -261,13 +258,13 @@ public class PigOutputFormat extends Out
// config properties have changed (eg. creating stores).
currentConf = new Configuration(newConf);
}
-
+
/**
* Check if given property prop is same in configurations conf1, conf2
* @param prop
* @param conf1
* @param conf2
- * @return true if both are equal
+ * @return true if both are equal
*/
private boolean isConfPropEqual(String prop, Configuration conf1,
Configuration conf2) {
@@ -283,10 +280,10 @@ public class PigOutputFormat extends Out
}
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext
+ public OutputCommitter getOutputCommitter(TaskAttemptContext
taskattemptcontext) throws IOException, InterruptedException {
setupUdfEnvAndStores(taskattemptcontext);
-
+
// we return an instance of PigOutputCommitter to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitter(taskattemptcontext,
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Thu Nov 27 12:49:54 2014
@@ -17,8 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import static org.apache.pig.PigConfiguration.TIME_UDFS;
-import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
import java.io.IOException;
@@ -119,10 +119,10 @@ public class PigRecordReader extends Rec
idx = 0;
this.limit = limit;
initNextRecordReader();
- doTiming = inputSpecificConf.getBoolean(TIME_UDFS, false);
+ doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false);
if (doTiming) {
counterGroup = loadFunc.toString();
- timingFrequency = inputSpecificConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+ timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Nov 27 12:49:54 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
@@ -109,7 +110,7 @@ public class WeightedRangePartitioner ex
Map<String, Object> quantileMap = null;
Configuration conf;
if (!pigContext.getExecType().isLocal()) {
- conf = new Configuration(true);
+ conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
} else {
conf = new Configuration(false);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -100,7 +102,7 @@ public abstract class PhysicalOperator e
// Will be used by operators to report status or transmit heartbeat
// Should be set by the backends to appropriate implementations that
// wrap their own version of a reporter.
- public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+ protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
// Will be used by operators to aggregate warning messages
// Should be set by the backends to appropriate implementations that
@@ -120,6 +122,10 @@ public abstract class PhysicalOperator e
private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class);
+ }
+
public PhysicalOperator(OperatorKey k) {
this(k, -1, null);
}
@@ -402,9 +408,9 @@ public abstract class PhysicalOperator e
}
public Result getNextDataBag() throws ExecException {
- Result ret = null;
+ Result val = new Result();
DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
- for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+ for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
if (ret.returnStatus == POStatus.STATUS_ERR) {
return ret;
} else if (ret.returnStatus == POStatus.STATUS_NULL) {
@@ -413,9 +419,9 @@ public abstract class PhysicalOperator e
tmpBag.add((Tuple) ret.result);
}
}
- ret.result = tmpBag;
- ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
- return ret;
+ val.result = tmpBag;
+ val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
+ return val;
}
public Result getNextBigInteger() throws ExecException {
@@ -451,6 +457,11 @@ public abstract class PhysicalOperator e
PhysicalOperator.reporter.set(reporter);
}
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ reporter = new ThreadLocal<PigProgressable>();
+ }
+
/**
* Make a deep copy of this operator. This function is blank, however,
* we should leave a place holder so that the subclasses can clone
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Nov 27 12:49:54 2014
@@ -18,10 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
-import static org.apache.pig.PigConfiguration.TIME_UDFS;
-import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
-import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
+import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -37,7 +37,6 @@ import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
-import org.apache.pig.PigWarning;
import org.apache.pig.TerminatingAccumulator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -67,7 +66,8 @@ public class POUserFunc extends Expressi
private transient String counterGroup;
private transient EvalFunc func;
- private transient String[] cacheFiles = null;
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
FuncSpec funcSpec;
FuncSpec origFSpec;
@@ -157,10 +157,10 @@ public class POUserFunc extends Expressi
func.setPigLogger(pigLogger);
Configuration jobConf = UDFContext.getUDFContext().getJobConf();
if (jobConf != null) {
- doTiming = jobConf.getBoolean(TIME_UDFS, false);
+ doTiming = jobConf.getBoolean(PIG_UDF_PROFILE, false);
if (doTiming) {
counterGroup = funcSpec.toString();
- timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+ timingFrequency = jobConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
}
}
// We initialize here instead of instantiateFunc because this is called
@@ -280,27 +280,6 @@ public class POUserFunc extends Expressi
}
try {
if(result.returnStatus == POStatus.STATUS_OK) {
- Tuple t = (Tuple) result.result;
-
- // For backward compatibility, we short-circuit tuples whose
- // size is 1 and field is null. (See PIG-3679)
- if (t.size() == 1 && t.isNull(0)) {
- pigLogger.warn(this, "All the input values are null, skipping the invocation of UDF",
- PigWarning.SKIP_UDF_CALL_FOR_NULL);
- Schema outputSchema = func.outputSchema(func.getInputSchema());
- // If the output schema is tuple (i.e. multiple fields are
- // to be returned), we return a tuple where every field is
- // null.
- if (outputSchema != null && outputSchema.getField(0).type == DataType.TUPLE) {
- result.result = tf.newTuple(outputSchema.getField(0).schema.size());
- // Otherwise, we simply return null since it can be cast to
- // any data type.
- } else {
- result.result = null;
- }
- return result;
- }
-
if (isAccumulative()) {
if (isAccumStarted()) {
if (!haveCheckedIfTerminatingAccumulator) {
@@ -554,20 +533,28 @@ public class POUserFunc extends Expressi
public FuncSpec getFuncSpec() {
return funcSpec;
}
-
+
public void setFuncSpec(FuncSpec funcSpec) {
this.funcSpec = funcSpec;
instantiateFunc(funcSpec);
}
- public String[] getCacheFiles() {
+ public List<String> getCacheFiles() {
return cacheFiles;
}
- public void setCacheFiles(String[] cf) {
+ public void setCacheFiles(List<String> cf) {
cacheFiles = cf;
}
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
+
public boolean combinable() {
return (func instanceof Algebraic);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Nov 27 12:49:54 2014
@@ -347,7 +347,12 @@ public class PhyPlanVisitor extends Plan
*/
public void visitPreCombinerLocalRearrange(
POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
- // TODO Auto-generated method stub
+ List<PhysicalPlan> inpPlans = preCombinerLocalRearrange.getPlans();
+ for (PhysicalPlan plan : inpPlans) {
+ pushWalker(mCurrentWalker.spawnChildWalker(plan));
+ visit();
+ popWalker();
+ }
}
public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 27 12:49:54 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.util.Arrays;
import java.util.Map;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -48,6 +49,9 @@ public class CombinerPackager extends Pa
private Map<Integer, Integer> keyLookup;
private int numBags;
+
+ private transient boolean initialized;
+ private transient boolean useDefaultBag;
/**
* A new POPostCombinePackage will be constructed as a near clone of the
@@ -91,15 +95,16 @@ public class CombinerPackager extends Pa
}
private DataBag createDataBag(int numBags) {
- String bagType = null;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
- }
-
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- return new NonSpillableDataBag();
+ if (!initialized) {
+ initialized = true;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
}
- return new InternalCachedBag(numBags);
+ return useDefaultBag ? new NonSpillableDataBag() : new InternalCachedBag(numBags);
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 27 12:49:54 2014
@@ -242,9 +242,15 @@ public class MultiQueryPackager extends
@Override
public Tuple getValueTuple(PigNullableWritable keyWritable,
- NullableTuple ntup, int index) throws ExecException {
+ NullableTuple ntup, int origIndex) throws ExecException {
this.keyWritable = keyWritable;
- return packagers.get(((int) index) & idxPart).getValueTuple(
- keyWritable, ntup, index);
+ int index = origIndex & idxPart;
+ PigNullableWritable newKey = keyWritable;
+ if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+ Tuple tup = (Tuple)this.keyWritable.getValueAsPigType();
+ newKey = HDataType.getWritableComparableTypes(tup.get(0), packagers.get(index).getKeyType());
+ newKey.setIndex((byte)origIndex);
+ }
+ return packagers.get(index).getValueTuple(newKey, ntup, origIndex);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Nov 27 12:49:54 2014
@@ -19,8 +19,8 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
-import java.util.Iterator;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -38,9 +38,6 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
/**
* The collected group operator is a special operator used when users give
@@ -71,7 +68,7 @@ public class POCollectedGroup extends Ph
private Object prevKey = null;
- private boolean useDefaultBag = false;
+ private transient boolean useDefaultBag;
public POCollectedGroup(OperatorKey k) {
this(k, -1, null);
@@ -127,18 +124,14 @@ public class POCollectedGroup extends Ph
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- return getStreamCloseResult();
- }
-
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP) {
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
if (this.parentPlan.endOfAllInput) {
return getStreamCloseResult();
} else {
@@ -172,7 +165,7 @@ public class POCollectedGroup extends Ph
if (prevKey == null && outputBag == null) {
if (PigMapReduce.sJobConfInternal.get() != null) {
- String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Nov 27 12:49:54 2014
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -42,15 +43,18 @@ import org.apache.pig.impl.plan.VisitorE
* Find the distinct set of tuples in a bag.
* This is a blocking operator. All the input is put in the hashset implemented
* in DistinctDataBag which also provides the other DataBag interfaces.
- *
- *
+ *
+ *
*/
public class PODistinct extends PhysicalOperator implements Cloneable {
private static final Log log = LogFactory.getLog(PODistinct.class);
private static final long serialVersionUID = 1L;
private boolean inputsAccumulated = false;
private DataBag distinctBag = null;
- transient Iterator<Tuple> it;
+
+ private transient boolean initialized;
+ private transient boolean useDefaultBag;
+ private transient Iterator<Tuple> it;
// PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
// custom partioner through here
@@ -87,17 +91,19 @@ public class PODistinct extends Physical
@Override
public Result getNextTuple() throws ExecException {
if (!inputsAccumulated) {
- // by default, we create InternalSortedBag, unless user configures
+ // by default, we create InternalDistinctBag, unless user configures
// explicitly to use old bag
- String bagType = null;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
- }
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- distinctBag = BagFactory.getInstance().newDistinctBag();
- } else {
- distinctBag = new InternalDistinctBag(3);
- }
+ if (!initialized) {
+ initialized = true;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE);
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+ distinctBag = useDefaultBag ? BagFactory.getInstance().newDistinctBag()
+ : new InternalDistinctBag(3);
Result in = processInput();
while (in.returnStatus != POStatus.STATUS_EOP) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Nov 27 12:49:54 2014
@@ -92,6 +92,10 @@ public class POForEach extends PhysicalO
protected Tuple inpTuple;
+ // Indicate the foreach statement can only in map side
+ // Currently only used in MR cross (See PIG-4175)
+ protected boolean mapSideOnly = false;
+
private Schema schema;
public POForEach(OperatorKey k) {
@@ -274,8 +278,9 @@ public class POForEach extends PhysicalO
throw new ExecException(e);
}
}else{
- inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
- // buffer.clear();
+ if (buffer instanceof POPackage.POPackageTupleBuffer) {
+ inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+ }
setAccumEnd();
}
@@ -293,7 +298,7 @@ public class POForEach extends PhysicalO
break;
}
}
-
+ buffer.clear();
} else {
res = processPlan();
}
@@ -786,4 +791,11 @@ public class POForEach extends PhysicalO
return planLeafOps;
}
+ public void setMapSideOnly(boolean mapSideOnly) {
+ this.mapSideOnly = mapSideOnly;
+ }
+
+ public boolean isMapSideOnly() {
+ return mapSideOnly;
+ }
}