You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC
svn commit: r1784224 [4/17] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 24 03:34:37 2017
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -60,7 +61,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -71,7 +71,6 @@ import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.PigJobControl;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -90,6 +89,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
@@ -122,7 +122,6 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
@@ -312,7 +311,7 @@ public class JobControlCompiler{
" should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
- JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
+ JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
try {
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -385,7 +384,7 @@ public class JobControlCompiler{
ArrayList<Pair<String,Long>> counterPairs;
try {
- counters = MRJobStats.getCounters(job);
+ counters = HadoopShims.getCounters(job);
String groupName = getGroupName(counters.getGroupNames());
// In case that the counter group was not find, we need to find
@@ -703,8 +702,7 @@ public class JobControlCompiler{
// since this path would be invalid for the new job being created
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
- conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
- conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
+ conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
// this is for unit tests since some don't create PigServer
@@ -1673,6 +1671,14 @@ public class JobControlCompiler{
if (distCachePath != null) {
log.info("Jar file " + url + " already in DistributedCache as "
+ distCachePath + ". Not copying to hdfs and adding again");
+ // Path already in dist cache
+ if (!HadoopShims.isHadoopYARN()) {
+ // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+ // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+ // But path may only be in 'mapred.cache.files' and not be in
+ // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+ DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+ }
}
else {
// REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1958,9 +1964,20 @@ public class JobControlCompiler{
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
+ try {
+ Class<?> clazz = PigContext
+ .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass",
+ org.apache.hadoop.mapreduce.Job.class, Class.class);
+ method.invoke(null, job, PigOutputFormat.class);
+ } catch (Exception e) {
+ job.setOutputFormatClass(PigOutputFormat.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
} else {
job.setOutputFormatClass(PigOutputFormat.class);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 24 03:34:37 2017
@@ -1116,9 +1116,7 @@ public class MRCompiler extends PhyPlanV
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPkgr().getPackageType() == PackageType.JOIN
- || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
- // Bloom join is not implemented in mapreduce mode and falls back to regular join
+ if (op.getPkgr().getPackageType() == PackageType.JOIN) {
curMROp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
@@ -1280,7 +1278,7 @@ public class MRCompiler extends PhyPlanV
List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
List<List<InputSplit>> results = MapRedUtil
.getCombinePigSplits(splits,
- fs.getDefaultBlockSize(path),
+ HadoopShims.getDefaultBlockSize(fs, path),
conf);
numFiles += results.size();
} else {
@@ -2434,7 +2432,7 @@ public class MRCompiler extends PhyPlanV
}else{
for(int i=0; i<transformPlans.size(); i++) {
eps1.add(transformPlans.get(i));
- flat1.add(i == transformPlans.size() - 1 ? true : false);
+ flat1.add(true);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 24 03:34:37 2017
@@ -19,9 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -42,8 +40,7 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.pig.PigConfiguration;
@@ -68,7 +65,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -82,18 +78,15 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
-import org.python.google.common.collect.Lists;
-
/**
* Main class that launches pig for Map Reduce
*
*/
-public class MapReduceLauncher extends Launcher {
+public class MapReduceLauncher extends Launcher{
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -101,30 +94,14 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
- public MapReduceLauncher() {
- super();
- Utils.addShutdownHookWithPriority(new HangingJobKiller(),
- PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
- }
-
@Override
public void kill() {
try {
- if (jc != null && jc.getRunningJobs().size() > 0) {
- log.info("Received kill signal");
+ log.debug("Receive kill signal");
+ if (jc!=null) {
for (Job job : jc.getRunningJobs()) {
- org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
- try {
- if (mrJob != null) {
- mrJob.killJob();
- }
- } catch (Exception ir) {
- throw new IOException(ir);
- }
+ HadoopShims.killJob(job);
log.info("Job " + job.getAssignedJobID() + " killed");
- String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(Calendar.getInstance().getTime());
- System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
@@ -324,7 +301,8 @@ public class MapReduceLauncher extends L
// Now wait, till we are finished.
while(!jc.allFinished()){
- jcThread.join(sleepTime);
+ try { jcThread.join(sleepTime); }
+ catch (InterruptedException e) {}
List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
@@ -343,6 +321,11 @@ public class MapReduceLauncher extends L
log.info("detailed locations: " + aliasLocation);
}
+ if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
+ log.info("More information at: http://" + jobTrackerLoc
+ + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
+ }
+
// update statistics for this job so jobId is set
MRPigStatsUtil.addJobStats(job);
MRScriptState.get().emitJobStartedNotification(
@@ -492,6 +475,10 @@ public class MapReduceLauncher extends L
for (Job job : succJobs) {
List<POStore> sts = jcc.getStores(job);
for (POStore st : sts) {
+ if (Utils.isLocal(pc, job.getJobConf())) {
+ HadoopShims.storeSchemaForLocal(job, st);
+ }
+
if (!st.isTmpStore()) {
// create an "_SUCCESS" file in output location if
// output location is a filesystem dir
@@ -757,7 +744,7 @@ public class MapReduceLauncher extends L
@SuppressWarnings("deprecation")
void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
try {
- Counters counters = MRJobStats.getCounters(job);
+ Counters counters = HadoopShims.getCounters(job);
if (counters==null)
{
long nullCounterCount =
@@ -811,13 +798,13 @@ public class MapReduceLauncher extends L
throw new ExecException(backendException);
}
try {
- Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -835,6 +822,5 @@ public class MapReduceLauncher extends L
throw new ExecException(e);
}
}
-
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Feb 24 03:34:37 2017
@@ -65,10 +65,7 @@ public class MapReduceOper extends Opera
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
public byte mapKeyType;
-
- //record the map key types of all splittees
- public byte[] mapKeyTypeOfSplittees;
-
+
//Indicates that the map plan creation
//is complete
boolean mapDone = false;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Feb 24 03:34:37 2017
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -581,17 +580,18 @@ class MultiQueryOptimizer extends MROpPl
}
private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
- Set<Byte> keyTypes = new HashSet<Byte>();
- for (MapReduceOper splittee : splittees) {
- keyTypes.add(splittee.mapKeyType);
- if (splittee.mapKeyTypeOfSplittees != null) {
- for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) {
- keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
+ boolean sameKeyType = true;
+ for (MapReduceOper outer : splittees) {
+ for (MapReduceOper inner : splittees) {
+ if (inner.mapKeyType != outer.mapKeyType) {
+ sameKeyType = false;
+ break;
}
}
-
+ if (!sameKeyType) break;
}
- return keyTypes.size() == 1;
+
+ return sameKeyType;
}
private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType)
@@ -1035,20 +1035,10 @@ class MultiQueryOptimizer extends MROpPl
splitter.mapKeyType = sameKeyType ?
mergeList.get(0).mapKeyType : DataType.TUPLE;
-
- setMapKeyTypeForSplitter(splitter,mergeList);
-
log.info("Requested parallelism of splitter: "
+ splitter.getRequestedParallelism());
}
- private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) {
- splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
- for (int i = 0; i < mergeList.size(); i++) {
- splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
- }
- }
-
private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
MapReduceOper splitter, POSplit splitOp) throws VisitorException {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 24 03:34:37 2017
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,11 +36,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -75,6 +72,7 @@ public class PigCombiner {
PhysicalOperator[] roots;
PhysicalOperator leaf;
+ PigContext pigContext = null;
private volatile boolean initialized = false;
//@StaticDataCleanup
@@ -93,11 +91,9 @@ public class PigCombiner {
Configuration jConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- Properties log4jProperties = (Properties) ObjectSerializer
- .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
- if (log4jProperties != null) {
- PropertyConfigurator.configure(log4jProperties);
- }
+ pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+ if (pigContext.getLog4jProperties()!=null)
+ PropertyConfigurator.configure(pigContext.getLog4jProperties());
UDFContext.getUDFContext().reset();
MapRedUtil.setupUDFContext(context.getConfiguration());
@@ -147,7 +143,7 @@ public class PigCombiner {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -161,7 +157,7 @@ public class PigCombiner {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
+ if (pack.getPkgr() instanceof JoinPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
@@ -272,6 +268,7 @@ public class PigCombiner {
pigReporter = null;
// Avoid OOM in Tez.
PhysicalOperator.setReporter(null);
+ pigContext = null;
roots = null;
cp = null;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Feb 24 03:34:37 2017
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +45,6 @@ import org.apache.pig.data.SchemaTupleBa
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -90,6 +88,7 @@ public abstract class PigGenericMapBase
private PhysicalOperator leaf;
+ PigContext pigContext = null;
private volatile boolean initialized = false;
/**
@@ -169,15 +168,13 @@ public abstract class PigGenericMapBase
inIllustrator = inIllustrator(context);
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
+ pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(job);
+ SchemaTupleBackend.initialize(job, pigContext);
- Properties log4jProperties = (Properties) ObjectSerializer
- .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
- if (log4jProperties != null) {
- PropertyConfigurator.configure(log4jProperties);
- }
+ if (pigContext.getLog4jProperties()!=null)
+ PropertyConfigurator.configure(pigContext.getLog4jProperties());
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -239,7 +236,7 @@ public abstract class PigGenericMapBase
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -252,7 +249,8 @@ public abstract class PigGenericMapBase
MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
- store.setUp();
+ if (!pigContext.inIllustrator)
+ store.setUp();
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Feb 24 03:34:37 2017
@@ -287,6 +287,7 @@ public class PigGenericMapReduce {
private PhysicalOperator leaf;
+ PigContext pigContext = null;
protected volatile boolean initialized = false;
private boolean inIllustrator = false;
@@ -318,9 +319,10 @@ public class PigGenericMapReduce {
sJobConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
+ pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(jConf);
+ SchemaTupleBackend.initialize(jConf, pigContext);
if (rp == null)
rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -375,7 +377,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -606,7 +608,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Fri Feb 24 03:34:37 2017
@@ -17,6 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.Map;
+import java.util.WeakHashMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
@@ -38,6 +41,7 @@ public final class PigHadoopLogger imple
private PigStatusReporter reporter = null;
private boolean aggregate = false;
+ private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
private PigHadoopLogger() {
}
@@ -64,6 +68,11 @@ public final class PigHadoopLogger imple
if (getAggregate()) {
if (reporter != null) {
+ // log at least once
+ if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+ log.warn(displayMessage);
+ msgMap.put(o, displayMessage);
+ }
if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
reporter.incrCounter(className, warningEnum.name(), 1);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 24 03:34:37 2017
@@ -197,11 +197,14 @@ public class PigInputFormat extends Inpu
ArrayList<FileSpec> inputs;
ArrayList<ArrayList<OperatorKey>> inpTargets;
+ PigContext pigContext;
try {
inputs = (ArrayList<FileSpec>) ObjectSerializer
.deserialize(conf.get(PIG_INPUTS));
inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
.deserialize(conf.get(PIG_INPUT_TARGETS));
+ pigContext = (PigContext) ObjectSerializer.deserialize(conf
+ .get("pig.pigContext"));
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
MapRedUtil.setupUDFContext(conf);
} catch (Exception e) {
@@ -231,7 +234,7 @@ public class PigInputFormat extends Inpu
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(!Utils.isLocal(conf)) {
+ if(!Utils.isLocal(pigContext, conf)) {
fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
}
@@ -267,7 +270,7 @@ public class PigInputFormat extends Inpu
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i),
- fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
+ HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Feb 24 03:34:37 2017
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -155,7 +156,12 @@ public class PigOutputCommitter extends
for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
if (mapCommitter.first!=null) {
try {
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+ && (Boolean)m.invoke(mapCommitter.first);
+ } catch (NoSuchMethodException e) {
+ allOutputCommitterSupportRecovery = false;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -167,7 +173,12 @@ public class PigOutputCommitter extends
reduceOutputCommitters) {
if (reduceCommitter.first!=null) {
try {
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+ && (Boolean)m.invoke(reduceCommitter.first);
+ } catch (NoSuchMethodException e) {
+ allOutputCommitterSupportRecovery = false;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -186,7 +197,10 @@ public class PigOutputCommitter extends
mapCommitter.second);
try {
// Use reflection, Hadoop 1.x line does not have such method
- mapCommitter.first.recoverTask(updatedContext);
+ Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+ m.invoke(mapCommitter.first, updatedContext);
+ } catch (NoSuchMethodException e) {
+ // We are using Hadoop 1.x, ignore
} catch (Exception e) {
throw new IOException(e);
}
@@ -198,7 +212,11 @@ public class PigOutputCommitter extends
TaskAttemptContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- reduceCommitter.first.recoverTask(updatedContext);
+ // Use reflection, Hadoop 1.x line does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+ m.invoke(reduceCommitter.first, updatedContext);
+ } catch (NoSuchMethodException e) {
+ // We are using Hadoop 1.x, ignore
} catch (Exception e) {
throw new IOException(e);
}
@@ -238,7 +256,10 @@ public class PigOutputCommitter extends
mapCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- mapCommitter.first.commitJob(updatedContext);
+ // Use reflection, 20.2 does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
+ m.setAccessible(true);
+ m.invoke(mapCommitter.first, updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -252,7 +273,10 @@ public class PigOutputCommitter extends
reduceCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- reduceCommitter.first.commitJob(updatedContext);
+ // Use reflection, 20.2 does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
+ m.setAccessible(true);
+ m.invoke(reduceCommitter.first, updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -269,7 +293,10 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
mapCommitter.second);
try {
- mapCommitter.first.abortJob(updatedContext, state);
+ // Use reflection, 20.2 does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+ m.setAccessible(true);
+ m.invoke(mapCommitter.first, updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
@@ -282,7 +309,10 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- reduceCommitter.first.abortJob(updatedContext, state);
+ // Use reflection, 20.2 does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+ m.setAccessible(true);
+ m.invoke(reduceCommitter.first, updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Feb 24 03:34:37 2017
@@ -515,11 +515,9 @@ public class PigSplit extends InputSplit
for (int i = 0; i < wrappedSplits.length; i++) {
st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " +
wrappedSplits[i].getClass().getName() + "\n Locations:\n");
- if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) {
- for (String location : wrappedSplits[i].getLocations())
- st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
- }
+ for (String location : wrappedSplits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
}
} catch (IOException e) {
return null;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Fri Feb 24 03:34:37 2017
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
Random rGen;
float[] probVec;
float epsilon = 0.0001f;
-
+
private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-
- public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
- rGen = new Random(seed);
+
+ public DiscreteProbabilitySampleGenerator(float[] probVec) {
+ rGen = new Random();
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
this.probVec = probVec;
- if (1-epsilon > sum || sum > 1+epsilon) {
+ if (1-epsilon > sum || sum > 1+epsilon) {
LOG.info("Sum of probabilities should be near one: " + sum);
}
}
-
+
public int getNext(){
double toss = rGen.nextDouble();
// if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
toss -= probVec[i];
if(toss<=0.0)
return i;
- }
+ }
return lastIdx;
}
-
+
public static void main(String[] args) {
float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
- DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
+ DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
CountingMap<Integer> cm = new CountingMap<Integer>();
for(int i=0;i<100;i++){
cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
public String toString() {
return Arrays.toString(probVec);
}
-
-
+
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Feb 24 03:34:37 2017
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,13 +31,13 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.NullableBigDecimalWritable;
import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -51,6 +52,7 @@ import org.apache.pig.impl.io.NullableTe
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
@@ -60,6 +62,7 @@ public class WeightedRangePartitioner ex
new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
protected PigNullableWritable[] quantiles;
protected RawComparator<PigNullableWritable> comparator;
+ private PigContext pigContext;
protected Configuration job;
protected boolean inited = false;
@@ -90,6 +93,11 @@ public class WeightedRangePartitioner ex
@SuppressWarnings("unchecked")
public void init() {
weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+ try {
+ pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deserialize pig context: ", e);
+ }
String quantilesFile = job.get("pig.quantilesFile", "");
if (quantilesFile.length() == 0) {
@@ -101,10 +109,10 @@ public class WeightedRangePartitioner ex
// use local file system to get the quantilesFile
Map<String, Object> quantileMap = null;
Configuration conf;
- if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) {
- conf = new Configuration(false);
+ if (!pigContext.getExecType().isLocal()) {
+ conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
} else {
- conf = new Configuration(job);
+ conf = new Configuration(false);
}
if (job.get("fs.file.impl") != null) {
conf.set("fs.file.impl", job.get("fs.file.impl"));
@@ -130,13 +138,11 @@ public class WeightedRangePartitioner ex
DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
- long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
- long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple)ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple)ent.getValue());
weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
+ new DiscreteProbabilitySampleGenerator(probVec));
}
}
// else - the quantiles file is empty - unless we have a bug, the
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Feb 24 03:34:37 2017
@@ -21,16 +21,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -107,7 +105,7 @@ public class EndOfAllInputSetter extends
public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
-
+
@Override
public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
@@ -124,13 +122,6 @@ public class EndOfAllInputSetter extends
}
}
- @Override
- public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
- if (lr instanceof POBuildBloomRearrangeTez) {
- endOfAllInputFlag = true;
- }
- super.visitLocalRearrange(lr);
- }
/**
* @return if end of all input is present
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Fri Feb 24 03:34:37 2017
@@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV
* @param plan MR plan to print
*/
public MRPrinter(PrintStream ps, MROperPlan plan) {
- super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true));
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
mStream = ps;
mStream.println("#--------------------------------------------------");
mStream.println("# Map Reduce Plan ");
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Feb 24 03:34:37 2017
@@ -441,10 +441,6 @@ public abstract class PhysicalOperator e
public void reset() {
}
- public boolean isEndOfAllInput() {
- return parentPlan.endOfAllInput;
- }
-
/**
* @return PigProgressable stored in threadlocal
*/
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Fri Feb 24 03:34:37 2017
@@ -19,10 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.math.RoundingMode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -39,8 +36,6 @@ public class Divide extends BinaryExpres
*
*/
private static final long serialVersionUID = 1L;
- public static final short BIGDECIMAL_MINIMAL_SCALE = 6;
- private static final Log LOG = LogFactory.getLog(Divide.class);
public Divide(OperatorKey k) {
super(k);
@@ -77,22 +72,12 @@ public class Divide extends BinaryExpres
case DataType.BIGINTEGER:
return ((BigInteger) a).divide((BigInteger) b);
case DataType.BIGDECIMAL:
- return bigDecimalDivideWithScale(a, b);
+ return ((BigDecimal) a).divide((BigDecimal) b);
default:
throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
}
}
- private Number bigDecimalDivideWithScale(Number a, Number b) {
- // Using same result scaling as Hive. See Arithmetic Rules:
- // https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf
- int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1);
- if (LOG.isDebugEnabled()) {
- LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale.");
- }
- return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP);
- }
-
/*
* This method is used to invoke the appropriate method, as Java does not provide generic
* dispatch for it.
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Feb 24 03:34:37 2017
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -90,8 +89,6 @@ public class POCast extends ExpressionOp
caster = ((LoadFunc)obj).getLoadCaster();
} else if (obj instanceof StreamToPig) {
caster = ((StreamToPig)obj).getLoadCaster();
- } else if (obj instanceof EvalFunc) {
- caster = ((EvalFunc)obj).getLoadCaster();
} else {
throw new IOException("Invalid class type "
+ funcSpec.getClassName());
@@ -168,7 +165,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBigInteger(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "BigInteger.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -284,7 +281,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBigDecimal(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "BigDecimal.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -399,7 +396,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBoolean(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "boolean.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -513,7 +510,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToInteger(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "int.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -639,7 +636,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToLong(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "long.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -762,7 +759,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToDouble(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "double.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -884,7 +881,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToFloat(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "float.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1010,7 +1007,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToDateTime(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "datetime.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1121,7 +1118,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToCharArray(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "string.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1273,7 +1270,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToTuple(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "tuple.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1335,7 +1332,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "bag.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1366,7 +1363,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "tuple.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1391,7 +1388,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "tuple.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1405,7 +1402,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBoolean(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "int.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1444,7 +1441,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToInteger(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "int.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1490,7 +1487,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToDouble(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "double.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1536,7 +1533,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToLong(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "long.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1582,7 +1579,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToFloat(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "float.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1628,7 +1625,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToDateTime(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "datetime.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1667,7 +1664,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToCharArray(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "float.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1715,7 +1712,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBigInteger(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "BigInteger.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1760,7 +1757,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBigDecimal(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "BigDecimal.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1798,10 +1795,6 @@ public class POCast extends ExpressionOp
default:
throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
}
- case DataType.BYTEARRAY:
- //no-op (PIG-4933)
- result = obj;
- break;
default:
throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
}
@@ -1868,7 +1861,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBag(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "bag.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1959,7 +1952,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToMap(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations();
+ String msg = unknownByteArrayErrorMessage + "map.";
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Fri Feb 24 03:34:37 2017
@@ -158,19 +158,23 @@ public class POProject extends Expressio
illustratorMarkup(inpValue, res.result, -1);
return res;
} else if(columns.size() == 1) {
- if ( inpValue == null ) {
- // the tuple is null, so a dereference should also produce a null
- res.returnStatus = POStatus.STATUS_OK;
- ret = null;
- } else if( inpValue.size() > columns.get(0) ) {
+ try {
ret = inpValue.get(columns.get(0));
- } else {
+ } catch (IndexOutOfBoundsException ie) {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
res.returnStatus = POStatus.STATUS_OK;
ret = null;
+ } catch (NullPointerException npe) {
+ // the tuple is null, so a dereference should also produce a null
+ // there is a slight danger here that the Tuple implementation
+ // may have given the exception for a different reason but if we
+ // don't catch it, we will die and the most common case for the
+ // exception would be because the tuple is null
+ res.returnStatus = POStatus.STATUS_OK;
+ ret = null;
}
} else if(isProjectToEnd){
ret = getRangeTuple(inpValue);
@@ -211,18 +215,23 @@ public class POProject extends Expressio
*/
private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i)
throws ExecException {
- if( inpValue == null ) {
- // the tuple is null, so a dereference should also produce a null
- objList.add(null);
- } else if( inpValue.size() > i ) {
+ try {
objList.add(inpValue.get(i));
- } else {
+ } catch (IndexOutOfBoundsException ie) {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " + i +
" which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
objList.add(null);
}
+ catch (NullPointerException npe) {
+ // the tuple is null, so a dereference should also produce a null
+ // there is a slight danger here that the Tuple implementation
+ // may have given the exception for a different reason but if we
+ // don't catch it, we will die and the most common case for the
+ // exception would be because the tuple is null
+ objList.add(null);
+ }
}
@Override
@@ -397,17 +406,21 @@ public class POProject extends Expressio
Object ret;
if(columns.size() == 1) {
- if( inpValue == null ) {
- // the tuple is null, so a dereference should also produce a null
- ret = null;
- } else if( inpValue.size() > columns.get(0) ) {
+ try{
ret = inpValue.get(columns.get(0));
- } else {
+ } catch (IndexOutOfBoundsException ie) {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
ret = null;
+ } catch (NullPointerException npe) {
+ // the tuple is null, so a dereference should also produce a null
+ // there is a slight danger here that the Tuple implementation
+ // may have given the exception for a different reason but if we
+ // don't catch it, we will die and the most common case for the
+ // exception would be because the tuple is null
+ ret = null;
}
} else if(isProjectToEnd) {
ret = getRangeTuple(inpValue);
@@ -415,17 +428,21 @@ public class POProject extends Expressio
ArrayList<Object> objList = new ArrayList<Object>(columns.size());
for(int col: columns) {
- if( inpValue == null ) {
- // the tuple is null, so a dereference should also produce a null
- objList.add(null);
- } else if( inpValue.size() > col ) {
+ try {
objList.add(inpValue.get(col));
- } else {
+ } catch (IndexOutOfBoundsException ie) {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
objList.add(null);
+ } catch (NullPointerException npe) {
+ // the tuple is null, so a dereference should also produce a null
+ // there is a slight danger here that the Tuple implementation
+ // may have given the exception for a different reason but if we
+ // don't catch it, we will die and the most common case for the
+ // exception would be because the tuple is null
+ objList.add(null);
}
}
ret = mTupleFactory.newTuple(objList);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Fri Feb 24 03:34:37 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
private Map<Integer, Integer> keyLookup;
private int numBags;
-
+
private transient boolean initialized;
private transient boolean useDefaultBag;
@@ -77,15 +77,6 @@ public class CombinerPackager extends Pa
}
}
- @Override
- public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
- throws ExecException {
- this.key = key;
- this.bags = bags;
- this.readOnce = readOnce;
- // Bag can be read directly and need not be materialized again
- }
-
/**
* @param keyInfo the keyInfo to set
*/
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Fri Feb 24 03:34:37 2017
@@ -17,7 +17,7 @@
*/
/**
- *
+ *
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -49,15 +48,6 @@ public class LitePackager extends Packag
private PigNullableWritable keyWritable;
@Override
- public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
- throws ExecException {
- this.key = key;
- this.bags = bags;
- this.readOnce = readOnce;
- // Bag can be read directly and need not be materialized again
- }
-
- @Override
public boolean[] getInner() {
return null;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Fri Feb 24 03:34:37 2017
@@ -256,9 +256,4 @@ public class POCross extends PhysicalOpe
data = null;
}
- @Override
- public void reset() {
- clearMemory();
- }
-
}