You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/06/19 21:56:03 UTC
svn commit: r669666 [2/3] - in /incubator/pig/branches/types:
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/local/executionengine/
src/org/apache/pig/builtin/ src/org/apache/pig/data/
src/org/apache/pig/impl/logicalLayer...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Thu Jun 19 12:56:00 2008
@@ -22,10 +22,10 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-public class Launcher {
+public abstract class Launcher {
private static final Log log = LogFactory.getLog(Launcher.class);
- int totalHadoopTimeSpent;
+ long totalHadoopTimeSpent;
protected Launcher(){
totalHadoopTimeSpent = 0;
@@ -59,63 +59,15 @@
* @throws ExecException
* @throws JobCreationException
*/
- protected boolean launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
+ public abstract boolean launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
throws PlanException, VisitorException, IOException, ExecException,
- JobCreationException {
- long sleepTime = 500;
- MRCompiler comp = new MRCompiler(php, pc);
- comp.compile();
-
- ExecutionEngine exe = pc.getExecutionEngine();
- Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
- JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
-
- MROperPlan mrp = comp.getMRPlan();
- JobControlCompiler jcc = new JobControlCompiler();
-
- JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
- new Thread(jc).start();
-
- int numMRJobs = jc.getWaitingJobs().size();
- double lastProg = -1;
- while(!jc.allFinished()){
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- double prog = calculateProgress(jc, jobClient)/numMRJobs;
- if(prog>lastProg)
- log.info(prog * 100 + "% complete");
- lastProg = prog;
- }
- lastProg = calculateProgress(jc, jobClient)/numMRJobs;
- if(isComplete(lastProg))
- log.info("Completed Successfully");
- else{
- log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
- List<Job> failedJobs = jc.getFailedJobs();
- if(failedJobs==null)
- throw new ExecException("Something terribly wrong with Job Control.");
- for (Job job : failedJobs) {
- getStats(job,jobClient);
- }
- }
- List<Job> succJobs = jc.getSuccessfulJobs();
- if(succJobs!=null)
- for(Job job : succJobs){
- getStats(job,jobClient);
- }
-
- jc.stop();
-
- return isComplete(lastProg);
- }
+ JobCreationException;
- private boolean isComplete(double prog){
+ protected boolean isComplete(double prog){
return (int)(Math.ceil(prog)) == (int)1;
}
- private void getStats(Job job, JobClient jobClient) throws IOException{
+ protected void getStats(Job job, JobClient jobClient) throws IOException{
String MRJobID = job.getMapredJobID();
TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
getErrorMessages(mapRep, "map");
@@ -125,15 +77,15 @@
totalHadoopTimeSpent += computeTimeSpent(mapRep);
}
- private int computeTimeSpent(TaskReport[] mapReports) {
- int timeSpent = 0;
+ protected long computeTimeSpent(TaskReport[] mapReports) {
+ long timeSpent = 0;
for (TaskReport r : mapReports) {
timeSpent += (r.getFinishTime() - r.getStartTime());
}
return timeSpent;
}
- protected static void getErrorMessages(TaskReport reports[], String type)
+ protected void getErrorMessages(TaskReport reports[], String type)
{
for (int i = 0; i < reports.length; i++) {
String msgs[] = reports[i].getDiagnostics();
@@ -154,7 +106,7 @@
* @return The progress as a precentage in double format
* @throws IOException
*/
- protected static double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
+ protected double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
double prog = 0.0;
prog += jc.getSuccessfulJobs().size();
@@ -176,7 +128,7 @@
* @return Returns the percentage progress of this Job
* @throws IOException
*/
- protected static double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
+ protected double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
String mrJobID = j.getMapredJobID();
RunningJob rj = jobClient.getJob(mrJobID);
if(rj==null && j.getState()==Job.SUCCESS)
@@ -189,7 +141,7 @@
return (mapProg + redProg)/2;
}
}
- public int getTotalHadoopTimeSpent() {
+ public long getTotalHadoopTimeSpent() {
return totalHadoopTimeSpent;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Thu Jun 19 12:56:00 2008
@@ -1,9 +1,18 @@
package org.apache.pig.impl.mapReduceLayer;
import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -11,6 +20,8 @@
public class LocalLauncher extends Launcher{
+ private static final Log log = LogFactory.getLog(Launcher.class);
+
@Override
public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
String grpName,
@@ -19,6 +30,117 @@
IOException,
ExecException,
JobCreationException {
- return super.launchPig(php, grpName, pc);
+ long sleepTime = 500;
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+
+ Configuration conf = new Configuration();
+ conf.set("mapred.job.tracker", "local");
+ JobClient jobClient = new JobClient(new JobConf(conf));
+
+ MROperPlan mrp = comp.getMRPlan();
+ JobControlCompiler jcc = new JobControlCompiler();
+
+ JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+
+
+ int numMRJobs = jc.getWaitingJobs().size();
+
+ new Thread(jc).start();
+
+ double lastProg = -1;
+ while(!jc.allFinished()){
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ double prog = calculateProgress(jc, jobClient)/numMRJobs;
+ if(prog>lastProg)
+ log.info(prog * 100 + "% complete");
+ lastProg = prog;
+ }
+ lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+ if(isComplete(lastProg))
+ log.info("Completed Successfully");
+ else{
+ log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+ List<Job> failedJobs = jc.getFailedJobs();
+ if(failedJobs==null)
+ throw new ExecException("Something terribly wrong with Job Control.");
+ for (Job job : failedJobs) {
+ getStats(job,jobClient);
+ }
+ }
+ List<Job> succJobs = jc.getSuccessfulJobs();
+ if(succJobs!=null)
+ for(Job job : succJobs){
+ getStats(job,jobClient);
+ }
+
+ jc.stop();
+
+ return isComplete(lastProg);
+ }
+
+ //A purely testing method. Not to be used elsewhere
+ public boolean launchPigWithCombinePlan(PhysicalPlan<PhysicalOperator> php,
+ String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
+ VisitorException, IOException, ExecException, JobCreationException {
+ long sleepTime = 500;
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+
+ Configuration conf = new Configuration();
+ conf.set("mapred.job.tracker", "local");
+ JobClient jobClient = new JobClient(new JobConf(conf));
+
+ MROperPlan mrp = comp.getMRPlan();
+ if(mrp.getLeaves().get(0)!=mrp.getRoots().get(0))
+ throw new PlanException("Unsupported configuration to test combine plan");
+
+ MapReduceOper mro = mrp.getLeaves().get(0);
+ mro.combinePlan = combinePlan;
+
+ JobControlCompiler jcc = new JobControlCompiler();
+
+ JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+
+ int numMRJobs = jc.getWaitingJobs().size();
+
+ new Thread(jc).start();
+
+ double lastProg = -1;
+ while (!jc.allFinished()) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ double prog = calculateProgress(jc, jobClient) / numMRJobs;
+ if (prog > lastProg)
+ log.info(prog * 100 + "% complete");
+ lastProg = prog;
+ }
+ lastProg = calculateProgress(jc, jobClient) / numMRJobs;
+ if (isComplete(lastProg))
+ log.info("Completed Successfully");
+ else {
+ log.info("Unsuccessful attempt. Completed " + lastProg * 100
+ + "% of the job");
+ List<Job> failedJobs = jc.getFailedJobs();
+ if (failedJobs == null)
+ throw new ExecException(
+ "Something terribly wrong with Job Control.");
+ for (Job job : failedJobs) {
+ getStats(job, jobClient);
+ }
+ }
+ List<Job> succJobs = jc.getSuccessfulJobs();
+ if (succJobs != null)
+ for (Job job : succJobs) {
+ getStats(job, jobClient);
+ }
+
+ jc.stop();
+
+ return isComplete(lastProg);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jun 19 12:56:00 2008
@@ -398,6 +398,7 @@
private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
+ if(compiledInputs.length>1) throw new PlanException("Received a multi input plan when expecting only a single input one.");
MapReduceOper mro = compiledInputs[0];
POStore str = getStore();
str.setSFile(fSpec);
@@ -779,7 +780,7 @@
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setColumn(1);
prj.setOverloaded(false);
- prj.setResultType(DataType.BYTEARRAY);
+ prj.setResultType(DataType.BAG);
ep.add(prj);
List<ExprPlan> eps2 = new ArrayList<ExprPlan>();
eps2.add(ep);
@@ -798,10 +799,13 @@
return mro;
}
- public MapReduceOper getQuantileJob(POSort sort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+ public MapReduceOper getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),RandomSampleLoader.class.getName());
MapReduceOper mro = startNew(quantLdFilName, prevJob);
mro.UDFs.add(FindQuantiles.class.getName());
+ POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
+ .getRequestedParallelism(), null, inpSort.getSortPlans(),
+ inpSort.getMAscCols(), inpSort.getMSortFunc());
if(sort.isUDFComparatorUsed)
mro.UDFs.add(sort.getMSortFunc().getFuncSpec());
@@ -832,6 +836,7 @@
ExprPlan ep1 = new ExprPlan();
ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
ce.setValue("all");
+ ce.setResultType(DataType.CHARARRAY);
ep1.add(ce);
List<ExprPlan> eps = new ArrayList<ExprPlan>();
@@ -858,8 +863,8 @@
POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
- topPrj.setOverloaded(true);
topPrj.setResultType(DataType.TUPLE);
+ topPrj.setOverloaded(true);
fe2Plan.add(topPrj);
ExprPlan nesSortPlan = new ExprPlan();
@@ -872,19 +877,22 @@
nesSortPlanLst.add(nesSortPlan);
sort.setSortPlans(nesSortPlanLst);
+ sort.setResultType(DataType.TUPLE);
fe2Plan.add(sort);
fe2Plan.connect(topPrj, sort);
ExprPlan ep3 = new ExprPlan();
POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar3.setResultType(DataType.TUPLE);
- prjStar3.setStar(true);
+ prjStar3.setResultType(DataType.BAG);
+ prjStar3.setColumn(0);
+ prjStar3.setStar(false);
ep3.add(prjStar3);
ExprPlan rpep = new ExprPlan();
ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
rpce.setRequestedParallelism(rp);
rpce.setValue(rp<=0?1:rp);
+ rpce.setResultType(DataType.INTEGER);
rpep.add(rpce);
List<ExprPlan> genEps = new ArrayList<ExprPlan>();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Thu Jun 19 12:56:00 2008
@@ -1,9 +1,20 @@
package org.apache.pig.impl.mapReduceLayer;
import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -14,7 +25,7 @@
*
*/
public class MapReduceLauncher extends Launcher{
-
+ private static final Log log = LogFactory.getLog(Launcher.class);
@Override
public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
String grpName,
@@ -23,6 +34,53 @@
IOException,
ExecException,
JobCreationException {
- return super.launchPig(php, grpName, pc);
+ long sleepTime = 500;
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+
+ ExecutionEngine exe = pc.getExecutionEngine();
+ Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+ JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+
+ MROperPlan mrp = comp.getMRPlan();
+ JobControlCompiler jcc = new JobControlCompiler();
+
+ JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+
+ int numMRJobs = jc.getWaitingJobs().size();
+
+ new Thread(jc).start();
+
+ double lastProg = -1;
+ while(!jc.allFinished()){
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ double prog = calculateProgress(jc, jobClient)/numMRJobs;
+ if(prog>lastProg)
+ log.info(prog * 100 + "% complete");
+ lastProg = prog;
+ }
+ lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+ if(isComplete(lastProg))
+ log.info("Completed Successfully");
+ else{
+ log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+ List<Job> failedJobs = jc.getFailedJobs();
+ if(failedJobs==null)
+ throw new ExecException("Something terribly wrong with Job Control.");
+ for (Job job : failedJobs) {
+ getStats(job,jobClient);
+ }
+ }
+ List<Job> succJobs = jc.getSuccessfulJobs();
+ if(succJobs!=null)
+ for(Job job : succJobs){
+ getStats(job,jobClient);
+ }
+
+ jc.stop();
+
+ return isComplete(lastProg);
}
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java?rev=669666&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java Thu Jun 19 12:56:00 2008
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.TargetedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * This class is the static Mapper & Reducer classes that
+ * are used by Pig to execute Pig Map Reduce jobs. Since
+ * there is a reduce phase, the leaf is bound to be a
+ * POLocalRearrange. So the map phase has to separate the
+ * key and indexed tuple and collect it into the output
+ * collector.
+ *
+ * The shuffle and sort phase sorts these key & indexed tuples
+ * and creates key, List<IndexedTuple> and passes the key and
+ * iterator to the list. The deserialized POPackage operator
+ * is used to package the key, List<IndexedTuple> into pigKey,
+ * Bag<Tuple> where pigKey is of the appropriate pig type and
+ * then the result of the package is attached to the reduce
+ * plan which is executed if its not empty. Either the result
+ * of the reduce plan or the package res is collected into
+ * the output collector.
+ *
+ */
+public class PigCombiner {
+
+ public static JobConf sJobConf = null;
+
+ public static class Combine extends MapReduceBase
+ implements
+ Reducer<WritableComparable, IndexedTuple, WritableComparable, Writable> {
+ private final Log log = LogFactory.getLog(getClass());
+
+ //The reduce plan
+ private PhysicalPlan<PhysicalOperator> cp;
+
+ //The POPackage operator which is the
+ //root of every Map Reduce plan is
+ //obtained through the job conf. The portion
+ //remaining after its removal is the reduce
+ //plan
+ private POPackage pack;
+
+ ProgressableReporter pigReporter;
+
+ /**
+ * Configures the Reduce plan, the POPackage operator
+ * and the reporter thread
+ */
+ @Override
+ public void configure(JobConf jConf) {
+ super.configure(jConf);
+ sJobConf = jConf;
+ try {
+ cp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(jConf
+ .get("pig.combinePlan"));
+ pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.combine.package"));
+ // To be removed
+ if(cp.isEmpty())
+ log.debug("Combine Plan empty!");
+ else{
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ cp.explain(baos);
+ log.debug(baos.toString());
+ }
+ // till here
+
+ long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
+
+ pigReporter = new ProgressableReporter();
+ } catch (IOException e) {
+ log.error(e.getMessage() + "was caused by:");
+ log.error(e.getCause().getMessage());
+ }
+ }
+
+ /**
+ * The reduce function which packages the key and List<IndexedTuple>
+ * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+ * The package result is either collected as is, if the reduce plan is
+ * empty or after passing through the reduce plan.
+ */
+ public void reduce(WritableComparable key,
+ Iterator<IndexedTuple> indInp,
+ OutputCollector<WritableComparable, Writable> oc,
+ Reporter reporter) throws IOException {
+
+ pigReporter.setRep(reporter);
+
+ Object k = DataType.convertToPigType(key);
+ pack.attachInput(k, indInp);
+
+ try {
+ Tuple t=null;
+ Result res = pack.getNext(t);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
+
+ if(cp.isEmpty()){
+ oc.collect(null, packRes);
+ return;
+ }
+
+ cp.attachInput(packRes);
+
+ List<PhysicalOperator> leaves = cp.getLeaves();
+
+ PhysicalOperator leaf = leaves.get(0);
+ while(true){
+ Result redRes = leaf.getNext(t);
+
+ if(redRes.returnStatus==POStatus.STATUS_OK){
+// oc.collect(null, (Tuple)redRes.result);
+ Tuple tuple = (Tuple)redRes.result;
+ Object combKey = tuple.get(0);
+ IndexedTuple it = (IndexedTuple)tuple.get(1);
+ WritableComparable wcKey = DataType.getWritableComparableTypes(combKey);
+ oc.collect(wcKey, it);
+ continue;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_EOP)
+ return;
+
+ if(redRes.returnStatus==POStatus.STATUS_NULL)
+ continue;
+
+ if(redRes.returnStatus==POStatus.STATUS_ERR){
+ IOException ioe = new IOException("Received Error while " +
+ "processing the reduce plan.");
+ throw ioe;
+ }
+ }
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL)
+ return;
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ IOException ioe = new IOException("Packaging error while processing group");
+ throw ioe;
+ }
+
+
+ } catch (ExecException e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(e.getCause());
+ throw ioe;
+ }
+ }
+
+
+ /**
+ * Will be called once all the intermediate keys and values are
+ * processed. So right place to stop the reporter thread.
+ */
+ @Override
+ public void close() throws IOException {
+ super.close();
+ /*if(runnableReporter!=null)
+ runnableReporter.setDone(true);*/
+ PhysicalOperator.setReporter(null);
+ }
+ }
+
+ /*interface MapOutputCollector<K extends WritableComparable, V extends Writable>
+ extends OutputCollector<K, V> {
+
+ public void close() throws IOException;
+
+ public void flush() throws IOException;
+
+ }
+
+ static class DirectMapOutputCollector<K extends WritableComparable, V extends Writable>
+ implements MapOutputCollector<K, V> {
+
+ private RecordWriter<K, V> out = null;
+
+ private Reporter reporter = null;
+
+ @SuppressWarnings("unchecked")
+ public DirectMapOutputCollector(JobConf job, Reporter reporter)
+ throws IOException {
+ this.reporter = reporter;
+ String finalName = job.getOutputPath().toString();
+ FileSystem fs = FileSystem.get(job);
+
+ out = job.getOutputFormat().getRecordWriter(fs, job, finalName,
+ reporter);
+ }
+
+ public void close() throws IOException {
+ if (this.out != null) {
+ out.close(this.reporter);
+ }
+
+ }
+
+ public void flush() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void collect(K key, V value) throws IOException {
+ System.out.println(value.toString());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ Random r = new Random();
+ PhysicalPlan<PhysicalOperator> rp = new PhysicalPlan<PhysicalOperator>();
+ POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+ rp.add(fe);
+ PigMapReduce.Reduce red = new PigMapReduce.Reduce();
+ POPackage pk = GenPhyOp.topPackageOp();
+ pk.setKeyType(DataType.INTEGER);
+ pk.setNumInps(1);
+ boolean[] inner = {false};
+ pk.setInner(inner);
+
+ JobConf jConf = new JobConf();
+ jConf.set("pig.reducePlan", ObjectSerializer.serialize(rp));
+ jConf.set("pig.reduce.package",ObjectSerializer.serialize(pk));
+ jConf.setOutputFormat(PigOutputFormat.class);
+ jConf.setOutputPath(new Path("pigmrtst1"));
+ red.configure(jConf);
+
+ WritableComparable key = new IntWritable(1);
+ List<IndexedTuple> itLst = new ArrayList<IndexedTuple>();
+ for(int i=0;i<2;i++){
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(GenRandomData.genRandString(r));
+ t.append(1);
+ IndexedTuple it = new IndexedTuple(t,0);
+ itLst.add(it);
+ }
+ red.reduce(key,itLst.iterator(),new DirectMapOutputCollector(jConf,reporter), reporter);
+ }*/
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Thu Jun 19 12:56:00 2008
@@ -39,16 +39,11 @@
WritableComparator comparator;
public int getPartition(WritableComparable key, Writable value,
- int numPartitions) {
- try{
- Tuple keyTuple = (Tuple)key;
- int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
- if (index < 0)
- index = -index-1;
- return Math.min(index, numPartitions - 1);
- }catch(ExecException e){
- throw new RuntimeException(e);
- }
+ int numPartitions){
+ int index = Arrays.binarySearch(quantiles, key, comparator);
+ if (index < 0)
+ index = -index-1;
+ return Math.min(index, numPartitions - 1);
}
public void configure(JobConf job) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java Thu Jun 19 12:56:00 2008
@@ -33,4 +33,11 @@
returnStatus = POStatus.STATUS_ERR;
result = null;
}
+
+ @Override
+ public String toString() {
+ return (result!=null)?result.toString():"NULL";
+ }
+
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
@Override
public String name() {
- return "Add - " + mKey.toString();
+ return "Add" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java Thu Jun 19 12:56:00 2008
@@ -60,9 +60,4 @@
public void setRhs(ExpressionOperator rhs) {
this.rhs = rhs;
}
-
- // TODO Don't we need something here that hooks lhs and rhs to our inputs in
- // the plan? Extenders of this class, such as Add, are depending on lhs and
- // rhs being set. LogToPhyTranslator is setting inputs. I don't see
- // anywhere connecting them together.
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
@Override
public String name() {
- return "Divide - " + mKey.toString();
+ return "Divide" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
@Override
public String name() {
- return "Equal To - " + mKey.toString();
+ return "Equal To" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -51,7 +52,7 @@
@Override
public String name() {
- return "Greater Than or Equal - " + mKey.toString();
+ return "Greater Than or Equal" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -77,7 +78,7 @@
right = (DataByteArray)res.result;
int ret = left.compareTo(right);
- if(ret==-1 || ret==0){
+ if(ret==1 || ret==0){
res.result = new Boolean(true);
//left = right = null;
return res;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -45,7 +46,7 @@
@Override
public String name() {
- return "Greater Than - " + mKey.toString();
+ return "Greater Than" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -75,7 +76,7 @@
right = (DataByteArray) res.result;
int ret = left.compareTo(right);
- if (ret == -1) {
+ if (ret == 1) {
res.result = new Boolean(true);
// left = right = null;
return res;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -49,7 +50,7 @@
@Override
public String name() {
- return "Less Than or Equal - " + mKey.toString();
+ return "Less Than or Equal" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -74,7 +75,7 @@
right = (DataByteArray) res.result;
int ret = left.compareTo(right);
- if (ret == 1 || ret == 0) {
+ if (ret == -1 || ret == 0) {
res.result = new Boolean(true);
//left = right = null;
return res;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
@Override
public String name() {
- return "Less Than - " + mKey.toString();
+ return "Less Than" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -75,7 +76,7 @@
right = (DataByteArray) res.result;
int ret = left.compareTo(right);
- if (ret == 1) {
+ if (ret == -1) {
res.result = new Boolean(true);
//left = right = null;
return res;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
@Override
public String name() {
- return "Mod - " + mKey.toString();
+ return "Mod" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
@Override
public String name() {
- return "Multiply - " + mKey.toString();
+ return "Multiply" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
@Override
public String name() {
- return "Not Equal To - " + mKey.toString();
+ return "Not Equal To" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
@Override
public String name() {
- return "And - " + mKey.toString();
+ return "And" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java Thu Jun 19 12:56:00 2008
@@ -22,6 +22,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.Result;
@@ -124,7 +125,7 @@
@Override
public String name() {
- return "POBinCond - " + mKey.toString();
+ return "POBinCond" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -28,6 +29,7 @@
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -41,8 +43,8 @@
* Need the full operator implementation.
*/
public class POCast extends ExpressionOperator {
-
- LoadFunc load;
+ private String loadFSpec;
+ transient private LoadFunc load;
private Log log = LogFactory.getLog(getClass());
private static final long serialVersionUID = 1L;
@@ -57,8 +59,14 @@
// TODO Auto-generated constructor stub
}
- public void setLoad(LoadFunc load) {
- this.load = load;
+ private void instantiateFunc() {
+ if(load!=null) return;
+ this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+ }
+
+ public void setLoadFSpec(String fSpec) {
+ this.loadFSpec = fSpec;
+ instantiateFunc();
}
@Override
@@ -69,7 +77,7 @@
@Override
public String name() {
- return "Cast - " + mKey.toString();
+ return "Cast" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -740,5 +748,10 @@
return res;
}
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+ is.defaultReadObject();
+ instantiateFunc();
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java Thu Jun 19 12:56:00 2008
@@ -55,7 +55,7 @@
@Override
public String name() {
// TODO Auto-generated method stub
- return "POIsNull - " + mKey.toString();
+ return "POIsNull" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -187,11 +187,4 @@
}
return res;
}
-
- public void setInput(ExpressionOperator in) {
- this.expr = in;
- }
-
-
-
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java Thu Jun 19 12:56:00 2008
@@ -22,6 +22,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -63,7 +64,7 @@
@Override
public String name() {
// TODO Auto-generated method stub
- return "POMapLookUp - " + mKey.toString();
+ return "POMapLookUp" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -49,7 +50,7 @@
@Override
public String name() {
// TODO Auto-generated method stub
- return "PONegative - " + mKey.toString();
+ return "PONegative" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -87,11 +88,4 @@
}
return res;
}
-
- public void setInput(ExpressionOperator in) {
- this.expr = in;
- }
-
-
-
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
@Override
public String name() {
- return "Not - " + mKey.toString();
+ return "Not" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -50,7 +51,7 @@
@Override
public String name() {
- return "Or - " + mKey.toString();
+ return "Or" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jun 19 12:56:00 2008
@@ -23,6 +23,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -78,7 +79,7 @@
@Override
public String name() {
- return "Project(" + ((star) ? "*" : column) + ") - " + mKey.toString();
+ return "Project" + "[" + DataType.findTypeName(resultType) + "]" +"(" + ((star) ? "*" : column) + ") - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Jun 19 12:56:00 2008
@@ -1,5 +1,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.List;
import java.util.Map;
@@ -27,13 +29,13 @@
public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
super(k, rp, inp);
this.funcSpec = funcSpec;
- this.func = func;
+ this.func = func;
+ if(func==null)
+ instantiateFunc();
}
public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
this(k, rp, inp, funcSpec, null);
-
- instantiateFunc();
}
private void instantiateFunc() {
@@ -42,8 +44,6 @@
}
public ComparisonFunc getComparator() {
- if (func == null)
- instantiateFunc();
return func;
}
@@ -51,10 +51,6 @@
public Result getNext(Integer i) throws ExecException {
Result result = new Result();
- if (func == null)
- instantiateFunc();
-
-
result.result = func.compare(t1, t2);
result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
: POStatus.STATUS_ERR;
@@ -117,13 +113,15 @@
}
public void attachInput(Tuple t1, Tuple t2) {
- if (func == null)
- instantiateFunc();
-
this.t1 = t1;
this.t2 = t2;
inputAttached = true;
}
+
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+ is.defaultReadObject();
+ instantiateFunc();
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java Thu Jun 19 12:56:00 2008
@@ -19,6 +19,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
@@ -52,9 +53,10 @@
Tuple t1, t2;
private final Log log = LogFactory.getLog(getClass());
String funcSpec;
- private final byte INITIAL = 0;
- private final byte INTERMEDIATE = 1;
- private final byte FINAL = 2;
+ String origFSpec;
+ public static final byte INITIAL = 0;
+ public static final byte INTERMEDIATE = 1;
+ public static final byte FINAL = 2;
public POUserFunc(OperatorKey k, int rp, List inp) {
super(k, rp);
@@ -64,8 +66,6 @@
public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
this(k, rp, inp, funcSpec, null);
-
- instantiateFunc();
}
public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
@@ -73,11 +73,13 @@
super(k, rp);
super.setInputs(inp);
this.funcSpec = funcSpec;
+ this.origFSpec = funcSpec;
this.func = func;
+ instantiateFunc(funcSpec);
}
- private void instantiateFunc() {
- this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ private void instantiateFunc(String fSpec) {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
this.func.setReporter(reporter);
}
@@ -140,7 +142,6 @@
if(temp.returnStatus!=POStatus.STATUS_OK)
return temp;
((Tuple)res.result).append(temp.result);
-
}
res.returnStatus = temp.returnStatus;
return res;
@@ -148,13 +149,8 @@
}
private Result getNext() throws ExecException {
- Tuple t = null;
- Result result;
- // instantiate the function if its null
- if (func == null)
- instantiateFunc();
-
- result = processInput();
+ Result result = processInput();
+
try {
if(result.returnStatus == POStatus.STATUS_OK) {
result.result = func.exec((Tuple) result.result);
@@ -239,25 +235,22 @@
// func is being changed.
switch (Function) {
case INITIAL:
- func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
- setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ funcSpec = getInitial();
break;
case INTERMEDIATE:
- func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
- setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ funcSpec = getIntermed();
break;
case FINAL:
- func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
- setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ funcSpec = getFinal();
break;
}
+ instantiateFunc(funcSpec);
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
}
public String getInitial() {
- if (func == null)
- instantiateFunc();
-
+ instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getInitial();
} else {
@@ -268,9 +261,7 @@
}
public String getIntermed() {
- if (func == null)
- instantiateFunc();
-
+ instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getIntermed();
} else {
@@ -281,9 +272,7 @@
}
public String getFinal() {
- if (func == null)
- instantiateFunc();
-
+ instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getFinal();
} else {
@@ -294,39 +283,24 @@
}
public Type getReturnType() {
- if (func == null)
- instantiateFunc();
-
return func.getReturnType();
}
public void finish() {
- if (func == null)
- instantiateFunc();
-
func.finish();
}
public Schema outputSchema(Schema input) {
- if (func == null)
- instantiateFunc();
-
return func.outputSchema(input);
}
public Boolean isAsynchronous() {
- if (func == null)
- instantiateFunc();
-
return func.isAsynchronous();
}
@Override
public String name() {
- if(funcSpec!=null)
- return "POUserFunc" + "(" + funcSpec + ")" + " - " + mKey.toString();
- else
- return "POUserFunc" + "(" + "DummySpec" + ")" + " - " + mKey.toString();
+ return "POUserFunc" + "(" + func.getClass().getName() + ")" + " - " + mKey.toString();
}
@Override
@@ -350,5 +324,9 @@
public String getFuncSpec() {
return funcSpec;
}
-
+
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+ is.defaultReadObject();
+ instantiateFunc(funcSpec);
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.physicalLayer.expressionOperators;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
@@ -46,7 +47,7 @@
@Override
public String name() {
- return "Subtract - " + mKey.toString();
+ return "Subtract" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java Thu Jun 19 12:56:00 2008
@@ -26,6 +26,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -104,7 +105,7 @@
@Override
public String name() {
// TODO Auto-generated method stub
- return "PODistinct - " + mKey.toString();
+ return "PODistinct" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java Thu Jun 19 12:56:00 2008
@@ -146,7 +146,7 @@
@Override
public String name() {
- return "Filter - " + mKey.toString();
+ return "Filter" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java Thu Jun 19 12:56:00 2008
@@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.IndexedTuple;
@@ -81,7 +82,7 @@
@Override
public String name() {
- return "For Each - " + mKey.toString();
+ return "For Each" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java Thu Jun 19 12:56:00 2008
@@ -20,6 +20,7 @@
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -63,7 +64,7 @@
@Override
public String name() {
- return "Global Rearrange - " + mKey.toString();
+ return "Global Rearrange" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jun 19 12:56:00 2008
@@ -87,7 +87,7 @@
@Override
public String name() {
- return "Local Rearrange - " + mKey.toString();
+ return "Local Rearrange" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java Thu Jun 19 12:56:00 2008
@@ -25,6 +25,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -93,7 +94,7 @@
@Override
public String name() {
- return "Package - " + mKey.toString();
+ return "Package" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Thu Jun 19 12:56:00 2008
@@ -17,6 +17,7 @@
*/
package org.apache.pig.impl.physicalLayer.relationalOperators;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
@@ -29,6 +30,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -54,7 +56,11 @@
*/
public class POSort extends PhysicalOperator<PhyPlanVisitor> {
- //private List<Integer> mSortCols;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ //private List<Integer> mSortCols;
private List<ExprPlan> sortPlans;
private List<Byte> ExprOutputTypes;
private List<Boolean> mAscCols;
@@ -108,8 +114,13 @@
}
- public class SortComparator implements Comparator<Tuple> {
- public int compare(Tuple o1, Tuple o2) {
+ public class SortComparator implements Comparator<Tuple>,Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public int compare(Tuple o1, Tuple o2) {
int count = 0;
int ret = 0;
if(sortPlans == null || sortPlans.size() == 0)
@@ -165,9 +176,14 @@
}
}
- public class UDFSortComparator implements Comparator<Tuple> {
+ public class UDFSortComparator implements Comparator<Tuple>,Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- public int compare(Tuple t1, Tuple t2) {
+ public int compare(Tuple t1, Tuple t2) {
mSortFunc.attachInput(t1, t2);
Integer i = null;
@@ -190,7 +206,7 @@
@Override
public String name() {
- return "POSort - " + mKey.toString();
+ return "POSort" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
@@ -220,13 +236,13 @@
}
if (it == null) {
- it = sortedBag.iterator();
- }
- res.result = it.next();
- if (res.result == null)
- res.returnStatus = POStatus.STATUS_EOP;
- else
- res.returnStatus = POStatus.STATUS_OK;
+ it = sortedBag.iterator();
+ }
+ if (it.hasNext()) {
+ res.result = it.next();
+ res.returnStatus = POStatus.STATUS_OK;
+ } else
+ res.returnStatus = POStatus.STATUS_EOP;
return res;
}
@@ -264,4 +280,8 @@
mSortFunc = sortFunc;
}
+ public List<Boolean> getMAscCols() {
+ return mAscCols;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java Thu Jun 19 12:56:00 2008
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -82,7 +83,7 @@
@Override
public String name() {
- return "Union - " + mKey.toString();
+ return "Union" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
@Override
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Thu Jun 19 12:56:00 2008
@@ -270,6 +270,7 @@
pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
Iterator<Tuple> iter = pigServer.openIterator("answer");
+ if(!iter.hasNext()) fail("No Output received");
while(iter.hasNext()){
Tuple t = iter.next();
assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
lt.setValue(inpba1);
rt.setValue(inpba2);
Result resba = g.getNext(inpba1);
- boolean retba = (inpba1.compareTo(inpba2) == -1 || inpba1
+ boolean retba = (inpba1.compareTo(inpba2) == 1 || inpba1
.compareTo(inpba2) == 0);
if ((Boolean) resba.result == retba)
return true;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
lt.setValue(inpba1);
rt.setValue(inpba2);
Result resba = g.getNext(inpba1);
- boolean retba = (inpba1.compareTo(inpba2) == -1);
+ boolean retba = (inpba1.compareTo(inpba2) == 1);
if ((Boolean) resba.result == retba)
return true;
return false;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
lt.setValue(inpba1);
rt.setValue(inpba2);
Result resba = g.getNext(inpba1);
- boolean retba = (inpba1.compareTo(inpba2) == 1 || inpba1
+ boolean retba = (inpba1.compareTo(inpba2) == -1 || inpba1
.compareTo(inpba2) == 0);
if ((Boolean) resba.result == retba)
return true;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java Thu Jun 19 12:56:00 2008
@@ -84,7 +84,7 @@
lt.setValue(inpba1);
rt.setValue(inpba2);
Result resba = g.getNext(inpba1);
- boolean retba = (inpba1.compareTo(inpba2) == 1);
+ boolean retba = (inpba1.compareTo(inpba2) == -1);
if ((Boolean) resba.result == retba)
return true;
return false;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java Thu Jun 19 12:56:00 2008
@@ -51,7 +51,7 @@
ConstantExpression lt = (ConstantExpression) GenPhyOp.exprConst();
lt.setResultType(type);
POIsNull isNullExpr = (POIsNull) GenPhyOp.compIsNullExpr();
- isNullExpr.setInput(lt);
+ isNullExpr.setExpr(lt);
Object inp1;
Result res;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Thu Jun 19 12:56:00 2008
@@ -662,7 +662,7 @@
public void testByteArrayToOther() throws PlanException, ExecException {
POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
LoadFunc load = new TestLoader();
- op.setLoad(load);
+ op.setLoadFSpec(load.getClass().getName());
POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
ExprPlan plan = new ExprPlan();
plan.add(prj);
Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld Thu Jun 19 12:56:00 2008
@@ -1,26 +1,26 @@
-For Each - Test-Plan-Builder-88
+For Each[tuple] - Test-Plan-Builder-88
| |
| POGenerate(false,false,false) - Test-Plan-Builder-87
| | |
-| | Add - Test-Plan-Builder-80
+| | Add[Unknown] - Test-Plan-Builder-80
| | |
-| | |---Add - Test-Plan-Builder-78
+| | |---Add[Unknown] - Test-Plan-Builder-78
| | | |
-| | | |---Project(0) - Test-Plan-Builder-76
+| | | |---Project[bytearray](0) - Test-Plan-Builder-76
| | | |
-| | | |---Project(1) - Test-Plan-Builder-77
+| | | |---Project[bytearray](1) - Test-Plan-Builder-77
| | |
| | |---Constant(5) - Test-Plan-Builder-79
| | |
-| | Subtract - Test-Plan-Builder-85
+| | Subtract[Unknown] - Test-Plan-Builder-85
| | |
-| | |---Subtract - Test-Plan-Builder-83
+| | |---Subtract[Unknown] - Test-Plan-Builder-83
| | | |
-| | | |---Project(0) - Test-Plan-Builder-81
+| | | |---Project[bytearray](0) - Test-Plan-Builder-81
| | | |
| | | |---Constant(5) - Test-Plan-Builder-82
| | |
-| | |---Project(1) - Test-Plan-Builder-84
+| | |---Project[bytearray](1) - Test-Plan-Builder-84
| | |
| | Constant(hello) - Test-Plan-Builder-86
|
Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld Thu Jun 19 12:56:00 2008
@@ -1,25 +1,25 @@
-For Each - Test-Plan-Builder-140
+For Each[tuple] - Test-Plan-Builder-140
| |
| POGenerate(false) - Test-Plan-Builder-139
| | |
-| | POBinCond - Test-Plan-Builder-138
+| | POBinCond[Unknown] - Test-Plan-Builder-138
| | |
-| | |---Equal To - Test-Plan-Builder-131
+| | |---Equal To[tuple] - Test-Plan-Builder-131
| | | |
-| | | |---Project(1) - Test-Plan-Builder-129
+| | | |---Project[bytearray](1) - Test-Plan-Builder-129
| | | |
| | | |---Constant(3) - Test-Plan-Builder-130
| | |
-| | |---Add - Test-Plan-Builder-134
+| | |---Add[Unknown] - Test-Plan-Builder-134
| | | |
-| | | |---Project(2) - Test-Plan-Builder-132
+| | | |---Project[bytearray](2) - Test-Plan-Builder-132
| | | |
-| | | |---Project(3) - Test-Plan-Builder-133
+| | | |---Project[bytearray](3) - Test-Plan-Builder-133
| | |
-| | |---Subtract - Test-Plan-Builder-137
+| | |---Subtract[Unknown] - Test-Plan-Builder-137
| | |
-| | |---Project(2) - Test-Plan-Builder-135
+| | |---Project[bytearray](2) - Test-Plan-Builder-135
| | |
-| | |---Project(3) - Test-Plan-Builder-136
+| | |---Project[bytearray](3) - Test-Plan-Builder-136
|
|---Load(a:org.apache.pig.builtin.PigStorage()) - Test-Plan-Builder-128
\ No newline at end of file