You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [8/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Feb 22 09:43:41 2017
@@ -32,7 +32,8 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,6 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
@@ -76,7 +76,7 @@ public abstract class Launcher {
protected Map<FileSpec, Exception> failureMap;
protected JobControl jc = null;
- class HangingJobKiller extends Thread {
+ protected class HangingJobKiller extends Thread {
public HangingJobKiller() {}
@Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
}
protected Launcher() {
- Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
// handle the windows portion of \r
if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
public void reset() {
failureMap = Maps.newHashMap();
totalHadoopTimeSpent = 0;
- jc = null;
}
/**
@@ -179,7 +177,7 @@ public abstract class Launcher {
String exceptionCreateFailMsg = null;
boolean jobFailed = false;
if (msgs.length > 0) {
- if (HadoopShims.isJobFailed(report)) {
+ if (report.getCurrentStatus()== TIPStatus.FAILED) {
jobFailed = true;
}
Set<String> errorMessageSet = new HashSet<String>();
@@ -261,11 +259,30 @@ public abstract class Launcher {
List<Job> runnJobs = jc.getRunningJobs();
for (Job j : runnJobs) {
- prog += HadoopShims.progressOfRunningJob(j);
+ prog += progressOfRunningJob(j);
}
return prog;
}
+ /**
+ * Returns the progress of a Job j which is part of a submitted JobControl
+ * object. The progress is for this Job. So it has to be scaled down by the
+ * num of jobs that are present in the JobControl.
+ *
+ * @param j The Job for which progress is required
+ * @return Returns the percentage progress of this Job
+ * @throws IOException
+ */
+ private static double progressOfRunningJob(Job j)
+ throws IOException {
+ org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+ try {
+ return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+ } catch (Exception ir) {
+ return 0;
+ }
+ }
+
public long getTotalHadoopTimeSpent() {
return totalHadoopTimeSpent;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Wed Feb 22 09:43:41 2017
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -122,7 +123,8 @@ public class FetchLauncher {
poStore.setUp();
TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
- HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+ //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId());
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO
}
if (outputCommitter.needsTaskCommit(context))
outputCommitter.commitTask(context);
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
@Override
@@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO
}
writer = null;
}
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Wed Feb 22 09:43:41 2017
@@ -22,43 +22,48 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
/**
* A special implementation of combiner used only for distinct. This combiner
* does not even parse out the records. It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
*/
public class DistinctCombiner {
- public static class Combine
+ public static class Combine
extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
+
private final Log log = LogFactory.getLog(getClass());
- ProgressableReporter pigReporter;
-
- /**
- * Configures the reporter
- */
+ private static boolean firstTime = true;
+
+ //@StaticDataCleanup
+ public static void staticDataCleanup() {
+ firstTime = true;
+ }
+
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- pigReporter = new ProgressableReporter();
+ Configuration jConf = context.getConfiguration();
+ // Avoid log spamming
+ if (firstTime) {
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+ firstTime = false;
+ }
}
-
+
/**
* The reduce function which removes values.
*/
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
-
- pigReporter.setRep(context);
// Take the first value and the key and collect
// just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
NullableTuple val = iter.next();
context.write(key, val);
}
+
}
-
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Feb 22 09:43:41 2017
@@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i
return -1;
}
- long bytes = 0;
Path p = new Path(getLocationUri(sto));
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] lst = fs.listStatus(p);
+ return getPathSize(p, p.getFileSystem(conf));
+ }
+
+ private long getPathSize(Path storePath, FileSystem fs) throws IOException {
+ long bytes = 0;
+ FileStatus[] lst = fs.listStatus(storePath);
if (lst != null) {
for (FileStatus status : lst) {
- bytes += status.getLen();
+ if (status.isFile()) {
+ if (status.getLen() > 0)
+ bytes += status.getLen();
+ }
+ else { // recursively count nested leaves' (files) sizes
+ bytes += getPathSize(status.getPath(), fs);
+ }
}
}
-
return bytes;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Feb 22 09:43:41 2017
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
return reducers;
}
- static long getTotalInputFileSize(Configuration conf,
+ public static long getTotalInputFileSize(Configuration conf,
List<POLoad> lds, Job job) throws IOException {
return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
}
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
/**
* Get the input size for as many inputs as possible. Inputs that do not report
* their size nor can pig look that up itself are excluded from this size.
- *
+ *
* @param conf Configuration
* @param lds List of POLoads
* @param job Job
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 22 09:43:41 2017
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -71,6 +71,7 @@ import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.PigJobControl;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
@@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
@@ -311,7 +312,7 @@ public class JobControlCompiler{
" should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
- JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+ JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
try {
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -384,7 +385,7 @@ public class JobControlCompiler{
ArrayList<Pair<String,Long>> counterPairs;
try {
- counters = HadoopShims.getCounters(job);
+ counters = MRJobStats.getCounters(job);
String groupName = getGroupName(counters.getGroupNames());
// In case that the counter group was not find, we need to find
@@ -702,7 +703,8 @@ public class JobControlCompiler{
// since this path would be invalid for the new job being created
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
- conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+ conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
+ conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
// this is for unit tests since some don't create PigServer
@@ -1671,14 +1673,6 @@ public class JobControlCompiler{
if (distCachePath != null) {
log.info("Jar file " + url + " already in DistributedCache as "
+ distCachePath + ". Not copying to hdfs and adding again");
- // Path already in dist cache
- if (!HadoopShims.isHadoopYARN()) {
- // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
- // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
- // But path may only be in 'mapred.cache.files' and not be in
- // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
- DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
- }
}
else {
// REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1964,20 +1958,9 @@ public class JobControlCompiler{
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormat.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormat.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
} else {
job.setOutputFormatClass(PigOutputFormat.class);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 22 09:43:41 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN
+ || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ // Bloom join is not implemented in mapreduce mode and falls back to regular join
curMROp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
@@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV
List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
List<List<InputSplit>> results = MapRedUtil
.getCombinePigSplits(splits,
- HadoopShims.getDefaultBlockSize(fs, path),
+ fs.getDefaultBlockSize(path),
conf);
numFiles += results.size();
} else {
@@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV
}else{
for(int i=0; i<transformPlans.size(); i++) {
eps1.add(transformPlans.get(i));
- flat1.add(true);
+ flat1.add(i == transformPlans.size() - 1 ? true : false);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 22 09:43:41 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.pig.PigConfiguration;
@@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.python.google.common.collect.Lists;
+
/**
* Main class that launches pig for Map Reduce
*
*/
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -94,14 +101,30 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
+ public MapReduceLauncher() {
+ super();
+ Utils.addShutdownHookWithPriority(new HangingJobKiller(),
+ PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+ }
+
@Override
public void kill() {
try {
- log.debug("Receive kill signal");
- if (jc!=null) {
+ if (jc != null && jc.getRunningJobs().size() > 0) {
+ log.info("Received kill signal");
for (Job job : jc.getRunningJobs()) {
- HadoopShims.killJob(job);
+ org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+ try {
+ if (mrJob != null) {
+ mrJob.killJob();
+ }
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
log.info("Job " + job.getAssignedJobID() + " killed");
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
@@ -301,8 +324,7 @@ public class MapReduceLauncher extends L
// Now wait, till we are finished.
while(!jc.allFinished()){
- try { jcThread.join(sleepTime); }
- catch (InterruptedException e) {}
+ jcThread.join(sleepTime);
List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
@@ -321,11 +343,6 @@ public class MapReduceLauncher extends L
log.info("detailed locations: " + aliasLocation);
}
- if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
- log.info("More information at: http://" + jobTrackerLoc
- + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
- }
-
// update statistics for this job so jobId is set
MRPigStatsUtil.addJobStats(job);
MRScriptState.get().emitJobStartedNotification(
@@ -475,10 +492,6 @@ public class MapReduceLauncher extends L
for (Job job : succJobs) {
List<POStore> sts = jcc.getStores(job);
for (POStore st : sts) {
- if (Utils.isLocal(pc, job.getJobConf())) {
- HadoopShims.storeSchemaForLocal(job, st);
- }
-
if (!st.isTmpStore()) {
// create an "_SUCCESS" file in output location if
// output location is a filesystem dir
@@ -744,7 +757,7 @@ public class MapReduceLauncher extends L
@SuppressWarnings("deprecation")
void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
try {
- Counters counters = HadoopShims.getCounters(job);
+ Counters counters = MRJobStats.getCounters(job);
if (counters==null)
{
long nullCounterCount =
@@ -798,13 +811,13 @@ public class MapReduceLauncher extends L
throw new ExecException(backendException);
}
try {
- Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -822,5 +835,6 @@ public class MapReduceLauncher extends L
throw new ExecException(e);
}
}
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Feb 22 09:43:41 2017
@@ -65,7 +65,10 @@ public class MapReduceOper extends Opera
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
public byte mapKeyType;
-
+
+ //record the map key types of all splittees
+ public byte[] mapKeyTypeOfSplittees;
+
//Indicates that the map plan creation
//is complete
boolean mapDone = false;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Feb 22 09:43:41 2017
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl
}
private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
- boolean sameKeyType = true;
- for (MapReduceOper outer : splittees) {
- for (MapReduceOper inner : splittees) {
- if (inner.mapKeyType != outer.mapKeyType) {
- sameKeyType = false;
- break;
+ Set<Byte> keyTypes = new HashSet<Byte>();
+ for (MapReduceOper splittee : splittees) {
+ keyTypes.add(splittee.mapKeyType);
+ if (splittee.mapKeyTypeOfSplittees != null) {
+ for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) {
+ keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
}
}
- if (!sameKeyType) break;
- }
- return sameKeyType;
+ }
+ return keyTypes.size() == 1;
}
private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType)
@@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl
splitter.mapKeyType = sameKeyType ?
mergeList.get(0).mapKeyType : DataType.TUPLE;
+
+ setMapKeyTypeForSplitter(splitter,mergeList);
+
log.info("Requested parallelism of splitter: "
+ splitter.getRequestedParallelism());
}
+ private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) {
+ splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
+ for (int i = 0; i < mergeList.size(); i++) {
+ splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
+ }
+ }
+
private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
MapReduceOper splitter, POSplit splitOp) throws VisitorException {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -72,7 +75,6 @@ public class PigCombiner {
PhysicalOperator[] roots;
PhysicalOperator leaf;
- PigContext pigContext = null;
private volatile boolean initialized = false;
//@StaticDataCleanup
@@ -91,9 +93,11 @@ public class PigCombiner {
Configuration jConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
- if (pigContext.getLog4jProperties()!=null)
- PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ Properties log4jProperties = (Properties) ObjectSerializer
+ .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+ if (log4jProperties != null) {
+ PropertyConfigurator.configure(log4jProperties);
+ }
UDFContext.getUDFContext().reset();
MapRedUtil.setupUDFContext(context.getConfiguration());
@@ -143,7 +147,7 @@ public class PigCombiner {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -157,7 +161,7 @@ public class PigCombiner {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack.getPkgr() instanceof JoinPackager)
+ if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
@@ -268,7 +272,6 @@ public class PigCombiner {
pigReporter = null;
// Avoid OOM in Tez.
PhysicalOperator.setReporter(null);
- pigContext = null;
roots = null;
cp = null;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Wed Feb 22 09:43:41 2017
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -88,7 +90,6 @@ public abstract class PigGenericMapBase
private PhysicalOperator leaf;
- PigContext pigContext = null;
private volatile boolean initialized = false;
/**
@@ -168,13 +169,15 @@ public abstract class PigGenericMapBase
inIllustrator = inIllustrator(context);
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(job, pigContext);
+ SchemaTupleBackend.initialize(job);
- if (pigContext.getLog4jProperties()!=null)
- PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ Properties log4jProperties = (Properties) ObjectSerializer
+ .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+ if (log4jProperties != null) {
+ PropertyConfigurator.configure(log4jProperties);
+ }
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -236,7 +239,7 @@ public abstract class PigGenericMapBase
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -249,8 +252,7 @@ public abstract class PigGenericMapBase
MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
- if (!pigContext.inIllustrator)
- store.setUp();
+ store.setUp();
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Wed Feb 22 09:43:41 2017
@@ -287,7 +287,6 @@ public class PigGenericMapReduce {
private PhysicalOperator leaf;
- PigContext pigContext = null;
protected volatile boolean initialized = false;
private boolean inIllustrator = false;
@@ -319,10 +318,9 @@ public class PigGenericMapReduce {
sJobConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(jConf, pigContext);
+ SchemaTupleBackend.initialize(jConf);
if (rp == null)
rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Feb 22 09:43:41 2017
@@ -17,9 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import java.util.Map;
-import java.util.WeakHashMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
@@ -41,7 +38,6 @@ public final class PigHadoopLogger imple
private PigStatusReporter reporter = null;
private boolean aggregate = false;
- private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
private PigHadoopLogger() {
}
@@ -68,11 +64,6 @@ public final class PigHadoopLogger imple
if (getAggregate()) {
if (reporter != null) {
- // log at least once
- if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
- log.warn(displayMessage);
- msgMap.put(o, displayMessage);
- }
if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
reporter.incrCounter(className, warningEnum.name(), 1);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Wed Feb 22 09:43:41 2017
@@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu
ArrayList<FileSpec> inputs;
ArrayList<ArrayList<OperatorKey>> inpTargets;
- PigContext pigContext;
try {
inputs = (ArrayList<FileSpec>) ObjectSerializer
.deserialize(conf.get(PIG_INPUTS));
inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
.deserialize(conf.get(PIG_INPUT_TARGETS));
- pigContext = (PigContext) ObjectSerializer.deserialize(conf
- .get("pig.pigContext"));
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
MapRedUtil.setupUDFContext(conf);
} catch (Exception e) {
@@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(!Utils.isLocal(pigContext, conf)) {
+ if(!Utils.isLocal(conf)) {
fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
}
@@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i),
- HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+ fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+ /**
+ *
+ * Get mapper's illustrator context
+ *
+ * @param conf Configuration
+ * @param input Input bag to serve as data source
+ * @param output Map output buffer
+ * @param split the split
+ * @return Illustrator's context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+ throws IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split));
+ return mapperContext;
+ }
+
+ public class IllustratorContext extends MapContextImpl<Text, Tuple, PigNullableWritable, Writable> {
+ private DataBag input;
+ List<Pair<PigNullableWritable, Writable>> output;
+ private Iterator<Tuple> it = null;
+ private Tuple value = null;
+ private boolean init = false;
+
+ public IllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output,
+ InputSplit split) throws IOException, InterruptedException {
+ super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split);
+ conf.set("inIllustrator", "true");
+ if (output == null)
+ throw new IOException("Null output can not be used");
+ this.input = input; this.output = output;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (input == null) {
+ if (!init) {
+ init = true;
+ return true;
+ }
+ return false;
+ }
+ if (it == null)
+ it = input.iterator();
+ if (!it.hasNext())
+ return false;
+ value = it.next();
+ return true;
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void write(PigNullableWritable key, Writable value)
+ throws IOException, InterruptedException {
+ output.add(new Pair<PigNullableWritable, Writable>(key, value));
+ }
+
+ @Override
+ public void progress() {
+
+ }
+ }
+
+ @Override
+ public boolean inIllustrator(Context context) {
+ return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.IllustratorContext;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+
+ static class IllustrateReducerContext extends WrappedReducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+ public IllustratorContext
+ getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
+ return new IllustratorContext(reduceContext);
+ }
+
+ public class IllustratorContext
+ extends WrappedReducer.Context {
+ public IllustratorContext(
+ ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
+ super(reduceContext);
+ }
+ public POPackage getPack() {
+ return ((Reduce.IllustratorContextImpl)reduceContext).pack;
+ }
+ }
+ }
+
+ public static class Reduce extends PigGenericMapReduce.Reduce {
+ /**
+ * Get reducer's illustrator context
+ *
+ * @param input Input buffer as output by maps
+ * @param pkg package
+ * @return reducer's illustrator context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new IllustrateReducerContext()
+ .getReducerContext(new IllustratorContextImpl(job, input, pkg));
+ return reducerContext;
+ }
+
+ @SuppressWarnings("unchecked")
+ public class IllustratorContextImpl extends ReduceContextImpl<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+ private PigNullableWritable currentKey = null, nextKey = null;
+ private NullableTuple nextValue = null;
+ private List<NullableTuple> currentValues = null;
+ private Iterator<Pair<PigNullableWritable, Writable>> it;
+ private final ByteArrayOutputStream bos;
+ private final DataOutputStream dos;
+ private final RawComparator sortComparator, groupingComparator;
+ public POPackage pack = null;
+ private IllustratorValueIterable iterable = new IllustratorValueIterable();
+
+ public IllustratorContextImpl(Job job,
+ List<Pair<PigNullableWritable, Writable>> input,
+ POPackage pkg
+ ) throws IOException, InterruptedException {
+ super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+ null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+ sortComparator = nwJob.getSortComparator();
+ groupingComparator = nwJob.getGroupingComparator();
+
+ Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+ @Override
+ public int compare(Pair<PigNullableWritable, Writable> o1,
+ Pair<PigNullableWritable, Writable> o2) {
+ try {
+ o1.first.write(dos);
+ int l1 = bos.size();
+ o2.first.write(dos);
+ int l2 = bos.size();
+ byte[] bytes = bos.toByteArray();
+ bos.reset();
+ return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+ }
+ }
+ }
+ );
+ currentValues = new ArrayList<NullableTuple>();
+ it = input.iterator();
+ if (it.hasNext()) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ }
+ pack = pkg;
+ }
+
+ public class IllustratorValueIterator implements ReduceContext.ValueIterator<NullableTuple> {
+
+ private int pos = -1;
+ private int mark = -1;
+
+ @Override
+ public void mark() throws IOException {
+ mark=pos-1;
+ if (mark<-1)
+ mark=-1;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ pos=mark;
+ }
+
+ @Override
+ public void clearMark() throws IOException {
+ mark=-1;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return pos<currentValues.size()-1;
+ }
+
+ @Override
+ public NullableTuple next() {
+ pos++;
+ return currentValues.get(pos);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove not implemented");
+ }
+
+ @Override
+ public void resetBackupStore() throws IOException {
+ pos=-1;
+ mark=-1;
+ }
+
+ }
+
+ protected class IllustratorValueIterable implements Iterable<NullableTuple> {
+ private IllustratorValueIterator iterator = new IllustratorValueIterator();
+ @Override
+ public Iterator<NullableTuple> iterator() {
+ return iterator;
+ }
+ }
+
+ @Override
+ public PigNullableWritable getCurrentKey() {
+ return currentKey;
+ }
+
+ @Override
+ public boolean nextKey() {
+ if (nextKey == null)
+ return false;
+ currentKey = nextKey;
+ currentValues.clear();
+ currentValues.add(nextValue);
+ nextKey = null;
+ for(; it.hasNext(); ) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ /* Why can't raw comparison be used?
+ byte[] bytes;
+ int l1, l2;
+ try {
+ currentKey.write(dos);
+ l1 = bos.size();
+ entry.first.write(dos);
+ l2 = bos.size();
+ bytes = bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("nextKey exception : "+e.getMessage());
+ }
+ bos.reset();
+ if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+ */
+ if (groupingComparator.compare(currentKey, entry.first) == 0)
+ {
+ currentValues.add((NullableTuple)entry.second);
+ } else {
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ break;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Iterable<NullableTuple> getValues() {
+ return iterable;
+ }
+
+ @Override
+ public void write(PigNullableWritable k, Writable t) {
+ }
+
+ @Override
+ public void progress() {
+ }
+ }
+
+ @Override
+ public boolean inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) {
+ return (context instanceof PigMapReduce.IllustrateReducerContext.IllustratorContext);
+ }
+
+ @Override
+ public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context context) {
+ return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) context).getPack();
+ }
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Wed Feb 22 09:43:41 2017
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -156,12 +155,7 @@ public class PigOutputCommitter extends
for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
if (mapCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(mapCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -173,12 +167,7 @@ public class PigOutputCommitter extends
reduceOutputCommitters) {
if (reduceCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(reduceCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -197,10 +186,7 @@ public class PigOutputCommitter extends
mapCommitter.second);
try {
// Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(mapCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ mapCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -212,11 +198,7 @@ public class PigOutputCommitter extends
TaskAttemptContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(reduceCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ reduceCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -256,10 +238,7 @@ public class PigOutputCommitter extends
mapCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext);
+ mapCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -273,10 +252,7 @@ public class PigOutputCommitter extends
reduceCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext);
+ reduceCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -293,10 +269,7 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
mapCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext, state);
+ mapCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
@@ -309,10 +282,7 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext, state);
+ reduceCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Wed Feb 22 09:43:41 2017
@@ -515,9 +515,11 @@ public class PigSplit extends InputSplit
for (int i = 0; i < wrappedSplits.length; i++) {
st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " +
wrappedSplits[i].getClass().getName() + "\n Locations:\n");
- for (String location : wrappedSplits[i].getLocations())
- st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) {
+ for (String location : wrappedSplits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
}
} catch (IOException e) {
return null;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Wed Feb 22 09:43:41 2017
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
Random rGen;
float[] probVec;
float epsilon = 0.0001f;
-
+
private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-
- public DiscreteProbabilitySampleGenerator(float[] probVec) {
- rGen = new Random();
+
+ public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+ rGen = new Random(seed);
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
this.probVec = probVec;
- if (1-epsilon > sum || sum > 1+epsilon) {
+ if (1-epsilon > sum || sum > 1+epsilon) {
LOG.info("Sum of probabilities should be near one: " + sum);
}
}
-
+
public int getNext(){
double toss = rGen.nextDouble();
// if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
toss -= probVec[i];
if(toss<=0.0)
return i;
- }
+ }
return lastIdx;
}
-
+
public static void main(String[] args) {
float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
- DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
+ DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
CountingMap<Integer> cm = new CountingMap<Integer>();
for(int i=0;i<100;i++){
cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
public String toString() {
return Arrays.toString(probVec);
}
-
-
+
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Wed Feb 22 09:43:41 2017
@@ -17,7 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,13 +30,13 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.NullableBigDecimalWritable;
import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -52,7 +51,6 @@ import org.apache.pig.impl.io.NullableTe
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
@@ -62,7 +60,6 @@ public class WeightedRangePartitioner ex
new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
protected PigNullableWritable[] quantiles;
protected RawComparator<PigNullableWritable> comparator;
- private PigContext pigContext;
protected Configuration job;
protected boolean inited = false;
@@ -93,11 +90,6 @@ public class WeightedRangePartitioner ex
@SuppressWarnings("unchecked")
public void init() {
weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
- try {
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
- } catch (IOException e) {
- throw new RuntimeException("Failed to deserialize pig context: ", e);
- }
String quantilesFile = job.get("pig.quantilesFile", "");
if (quantilesFile.length() == 0) {
@@ -109,10 +101,10 @@ public class WeightedRangePartitioner ex
// use local file system to get the quantilesFile
Map<String, Object> quantileMap = null;
Configuration conf;
- if (!pigContext.getExecType().isLocal()) {
- conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
- } else {
+ if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) {
conf = new Configuration(false);
+ } else {
+ conf = new Configuration(job);
}
if (job.get("fs.file.impl") != null) {
conf.set("fs.file.impl", job.get("fs.file.impl"));
@@ -138,11 +130,13 @@ public class WeightedRangePartitioner ex
DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
+ long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
+ long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple)ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple)ent.getValue());
weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(probVec));
+ new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
}
}
// else - the quantiles file is empty - unless we have a bug, the
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Wed Feb 22 09:43:41 2017
@@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends
public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
-
+
@Override
public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
@@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends
}
}
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+ if (lr instanceof POBuildBloomRearrangeTez) {
+ endOfAllInputFlag = true;
+ }
+ super.visitLocalRearrange(lr);
+ }
/**
* @return if end of all input is present
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Wed Feb 22 09:43:41 2017
@@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV
* @param plan MR plan to print
*/
public MRPrinter(PrintStream ps, MROperPlan plan) {
- super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true));
mStream = ps;
mStream.println("#--------------------------------------------------");
mStream.println("# Map Reduce Plan ");
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Feb 22 09:43:41 2017
@@ -441,6 +441,10 @@ public abstract class PhysicalOperator e
public void reset() {
}
+ public boolean isEndOfAllInput() {
+ return parentPlan.endOfAllInput;
+ }
+
/**
* @return PigProgressable stored in threadlocal
*/
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Wed Feb 22 09:43:41 2017
@@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.math.RoundingMode;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -36,6 +39,8 @@ public class Divide extends BinaryExpres
*
*/
private static final long serialVersionUID = 1L;
+ public static final short BIGDECIMAL_MINIMAL_SCALE = 6;
+ private static final Log LOG = LogFactory.getLog(Divide.class);
public Divide(OperatorKey k) {
super(k);
@@ -72,12 +77,22 @@ public class Divide extends BinaryExpres
case DataType.BIGINTEGER:
return ((BigInteger) a).divide((BigInteger) b);
case DataType.BIGDECIMAL:
- return ((BigDecimal) a).divide((BigDecimal) b);
+ return bigDecimalDivideWithScale(a, b);
default:
throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
}
}
+ private Number bigDecimalDivideWithScale(Number a, Number b) {
+ // Using same result scaling as Hive. See Arithmetic Rules:
+ // https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf
+ int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale.");
+ }
+ return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP);
+ }
+
/*
* This method is used to invoke the appropriate method, as Java does not provide generic
* dispatch for it.