You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/01/07 23:16:23 UTC
svn commit: r1056536 [2/2] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/scripting/ src/org/apache/pig/scripting/jython/
src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jan 7 22:16:22 2011
@@ -17,12 +17,9 @@
*/
package org.apache.pig.tools.pigstats;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,30 +27,17 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.newplan.BaseOperatorPlan;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.JobStats.JobState;
+import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
/**
* PigStats encapsulates the statistics collected from a running script.
@@ -63,203 +47,45 @@ import org.apache.pig.tools.pigstats.Job
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public final class PigStats {
+public abstract class PigStats {
private static final Log LOG = LogFactory.getLog(PigStats.class);
- private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
private static ThreadLocal<PigStats> tps = new ThreadLocal<PigStats>();
- private PigContext pigContext;
-
- private JobClient jobClient;
-
- private JobControlCompiler jcc;
-
- private JobGraph jobPlan;
-
- // map MR job id to MapReduceOper
- private Map<String, MapReduceOper> jobMroMap;
-
- private Map<MapReduceOper, JobStats> mroJobMap;
-
- private long startTime = -1;
- private long endTime = -1;
+ protected int returnCode = ReturnCode.UNKNOWN;
- private String userId;
-
- private int returnCode = ReturnCode.UNKNOWN;
private String errorMessage;
private int errorCode = -1;
public static PigStats get() {
- if (tps.get() == null) tps.set(new PigStats());
- return tps.get();
- }
-
- static PigStats start() {
- tps.set(new PigStats());
+ if (tps.get() == null) tps.set(new SimplePigStats());
return tps.get();
}
- /**
- * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
- */
- public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
-
- @Override
- public String toString() {
- JobGraphPrinter jp = new JobGraphPrinter(this);
- try {
- jp.visit();
- } catch (FrontendException e) {
- LOG.warn("unable to print job plan", e);
- }
- return jp.toString();
- }
-
- public Iterator<JobStats> iterator() {
- return new Iterator<JobStats>() {
- private Iterator<Operator> iter = getOperators();
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
- @Override
- public JobStats next() {
- return (JobStats)iter.next();
- }
- @Override
- public void remove() {}
- };
- }
-
- boolean isConnected(Operator from, Operator to) {
- List<Operator> succs = null;
- succs = getSuccessors(from);
- if (succs != null) {
- for (Operator succ: succs) {
- if (succ.getName().equals(to.getName())
- || isConnected(succ, to)) {
- return true;
- }
- }
- }
- return false;
- }
-
- List<JobStats> getSuccessfulJobs() {
- ArrayList<JobStats> lst = new ArrayList<JobStats>();
- Iterator<JobStats> iter = iterator();
- while (iter.hasNext()) {
- JobStats js = iter.next();
- if (js.getState() == JobState.SUCCESS) {
- lst.add(js);
- }
- }
- Collections.sort(lst, new JobComparator());
- return lst;
- }
-
- List<JobStats> getFailedJobs() {
- ArrayList<JobStats> lst = new ArrayList<JobStats>();
- Iterator<JobStats> iter = iterator();
- while (iter.hasNext()) {
- JobStats js = iter.next();
- if (js.getState() == JobState.FAILED) {
- lst.add(js);
- }
- }
- return lst;
- }
- }
-
- /**
- * This class builds the job DAG from a MR plan
- */
- private class JobGraphBuilder extends MROpPlanVisitor {
-
- public JobGraphBuilder(MROperPlan plan) {
- super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
- plan));
- jobPlan = new JobGraph();
- mroJobMap = new HashMap<MapReduceOper, JobStats>();
- }
-
- @Override
- public void visitMROp(MapReduceOper mr) throws VisitorException {
- JobStats js = new JobStats(
- mr.getOperatorKey().toString(), jobPlan);
- jobPlan.add(js);
- List<MapReduceOper> preds = getPlan().getPredecessors(mr);
- if (preds != null) {
- for (MapReduceOper pred : preds) {
- JobStats jpred = mroJobMap.get(pred);
- if (!jobPlan.isConnected(jpred, js)) {
- jobPlan.connect(jpred, js);
- }
- }
- }
- mroJobMap.put(mr, js);
- }
+ static void set(PigStats stats) {
+ tps.set(stats);
}
-
- /**
- * This class prints a JobGraph
- */
- static class JobGraphPrinter extends PlanVisitor {
-
- StringBuffer buf;
-
- protected JobGraphPrinter(OperatorPlan plan) {
- super(plan,
- new org.apache.pig.newplan.DependencyOrderWalker(
- plan));
- buf = new StringBuffer();
- }
-
- public void visit(JobStats op) throws FrontendException {
- buf.append(op.getJobId());
- List<Operator> succs = plan.getSuccessors(op);
- if (succs != null) {
- buf.append("\t->\t");
- for (Operator p : succs) {
- buf.append(((JobStats)p).getJobId()).append(",");
- }
- }
- buf.append("\n");
- }
- @Override
- public String toString() {
- buf.append("\n");
- return buf.toString();
- }
- }
-
- private static class JobComparator implements Comparator<JobStats> {
- @Override
- public int compare(JobStats o1, JobStats o2) {
- return o1.getJobId().compareTo(o2.getJobId());
- }
- }
-
- public boolean isSuccessful() {
- return (returnCode == ReturnCode.SUCCESS);
+ static PigStats start() {
+ tps.set(new SimplePigStats());
+ return tps.get();
}
/**
- * Return codes are defined in {@link ReturnCode}
+ * Returns code are defined in {@link ReturnCode}
*/
public int getReturnCode() {
return returnCode;
}
+ /**
+ * Returns error message string
+ */
public String getErrorMessage() {
return errorMessage;
}
-
+
/**
* Returns the error code of {@link PigException}
*/
@@ -267,156 +93,77 @@ public final class PigStats {
return errorCode;
}
+ public abstract boolean isEmbedded();
+
+ public abstract boolean isSuccessful();
+
+ public abstract Map<String, List<PigStats>> getAllStats();
+
+ public abstract List<String> getAllErrorMessages();
+
/**
* Returns the properties associated with the script
*/
- public Properties getPigProperties() {
- if (pigContext == null) return null;
- return pigContext.getProperties();
- }
+ public abstract Properties getPigProperties();
/**
* Returns the DAG of the MR jobs spawned by the script
*/
- public JobGraph getJobGraph() {
- return jobPlan;
- }
+ public abstract JobGraph getJobGraph();
/**
* Returns the list of output locations in the script
*/
- public List<String> getOutputLocations() {
- ArrayList<String> locations = new ArrayList<String>();
- for (OutputStats output : getOutputStats()) {
- locations.add(output.getLocation());
- }
- return Collections.unmodifiableList(locations);
- }
+ public abstract List<String> getOutputLocations();
/**
* Returns the list of output names in the script
*/
- public List<String> getOutputNames() {
- ArrayList<String> names = new ArrayList<String>();
- for (OutputStats output : getOutputStats()) {
- names.add(output.getName());
- }
- return Collections.unmodifiableList(names);
- }
+ public abstract List<String> getOutputNames();
/**
* Returns the number of bytes for the given output location,
* -1 for invalid location or name.
*/
- public long getNumberBytes(String location) {
- if (location == null) return -1;
- String name = new Path(location).getName();
- long count = -1;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- count = output.getBytes();
- break;
- }
- }
- return count;
- }
+ public abstract long getNumberBytes(String location);
/**
* Returns the number of records for the given output location,
* -1 for invalid location or name.
*/
- public long getNumberRecords(String location) {
- if (location == null) return -1;
- String name = new Path(location).getName();
- long count = -1;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- count = output.getNumberRecords();
- break;
- }
- }
- return count;
- }
+ public abstract long getNumberRecords(String location);
/**
* Returns the alias associated with this output location
*/
- public String getOutputAlias(String location) {
- if (location == null) return null;
- String name = new Path(location).getName();
- String alias = null;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- alias = output.getAlias();
- break;
- }
- }
- return alias;
- }
+ public abstract String getOutputAlias(String location);
/**
* Returns the total spill counts from {@link SpillableMemoryManager}.
*/
- public long getSMMSpillCount() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- ret += it.next().getSMMSpillCount();
- }
- return ret;
- }
+ public abstract long getSMMSpillCount();
/**
* Returns the total number of bags that spilled proactively
*/
- public long getProactiveSpillCountObjects() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- ret += it.next().getProactiveSpillCountObjects();
- }
- return ret;
- }
+ public abstract long getProactiveSpillCountObjects();
/**
* Returns the total number of records that spilled proactively
*/
- public long getProactiveSpillCountRecords() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- ret += it.next().getProactiveSpillCountRecs();
- }
- return ret;
- }
+ public abstract long getProactiveSpillCountRecords();
/**
* Returns the total bytes written to user specified HDFS
* locations of this script.
*/
- public long getBytesWritten() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- long n = it.next().getBytesWritten();
- if (n > 0) ret += n;
- }
- return ret;
- }
+ public abstract long getBytesWritten();
/**
* Returns the total number of records in user specified output
* locations of this script.
*/
- public long getRecordWritten() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- long n = it.next().getRecordWrittern();
- if (n > 0) ret += n;
- }
- return ret;
- }
+ public abstract long getRecordWritten();
public String getHadoopVersion() {
return ScriptState.get().getHadoopVersion();
@@ -426,291 +173,110 @@ public final class PigStats {
return ScriptState.get().getPigVersion();
}
- public String getScriptId() {
- return ScriptState.get().getId();
- }
+ public abstract String getScriptId();
- public String getFeatures() {
- return ScriptState.get().getScriptFeatures();
- }
+ public abstract String getFeatures();
- public long getDuration() {
- return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
- }
+ public abstract long getDuration();
/**
* Returns the number of MR jobs for this script
*/
- public int getNumberJobs() {
- return jobPlan.size();
- }
-
- public List<OutputStats> getOutputStats() {
- List<OutputStats> outputs = new ArrayList<OutputStats>();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- for (OutputStats os : iter.next().getOutputs()) {
- outputs.add(os);
- }
- }
- return Collections.unmodifiableList(outputs);
- }
-
- public List<InputStats> getInputStats() {
- List<InputStats> inputs = new ArrayList<InputStats>();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- for (InputStats is : iter.next().getInputs()) {
- inputs.add(is);
- }
- }
- return Collections.unmodifiableList(inputs);
- }
-
- private PigStats() {
- jobMroMap = new HashMap<String, MapReduceOper>();
- jobPlan = new JobGraph();
- }
-
- void start(PigContext pigContext, JobClient jobClient,
- JobControlCompiler jcc, MROperPlan mrPlan) {
-
- if (pigContext == null || jobClient == null || jcc == null) {
- LOG.warn("invalid params: " + pigContext + jobClient + jcc);
- return;
- }
+ public abstract int getNumberJobs();
- this.pigContext = pigContext;
- this.jobClient = jobClient;
- this.jcc = jcc;
-
- // build job DAG with job ids assigned to null
- try {
- new JobGraphBuilder(mrPlan).visit();
- } catch (VisitorException e) {
- LOG.warn("unable to build job plan", e);
- }
-
- startTime = System.currentTimeMillis();
- userId = System.getProperty("user.name");
- }
+ public abstract List<OutputStats> getOutputStats();
- void stop() {
- endTime = System.currentTimeMillis();
- int m = getNumberSuccessfulJobs();
- int n = getNumberFailedJobs();
-
- if (n == 0 && m > 0 && m == jobPlan.size()) {
- returnCode = ReturnCode.SUCCESS;
- } else if (m > 0 && m < jobPlan.size()) {
- returnCode = ReturnCode.PARTIAL_FAILURE;
- } else {
- returnCode = ReturnCode.FAILURE;
- }
- }
-
- boolean isInitialized() {
- return startTime > 0;
- }
+ public abstract OutputStats result(String alias);
- JobClient getJobClient() {
- return jobClient;
- }
+ public abstract List<InputStats> getInputStats();
- JobControlCompiler getJobControlCompiler() {
- return jcc;
+ void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
}
- void setReturnCode(int returnCode) {
- this.returnCode = returnCode;
- }
-
- @SuppressWarnings("deprecation")
- JobStats addJobStats(Job job) {
- MapReduceOper mro = null;
- JobID jobId = job.getAssignedJobID();
- if (jobId != null) {
- mro = jobMroMap.get(jobId.toString());
- } else {
- mro = jobMroMap.get(job.toString());
- }
- if (mro == null) {
- LOG.warn("unable to get MR oper for job: "
- + ((jobId == null) ? job.toString() : jobId.toString()));
- return null;
+ void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ }
+ /**
+ * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
+ */
+ public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
+
+ @Override
+ public String toString() {
+ JobGraphPrinter jp = new JobGraphPrinter(this);
+ try {
+ jp.visit();
+ } catch (FrontendException e) {
+ LOG.warn("unable to print job plan", e);
+ }
+ return jp.toString();
}
- JobStats js = mroJobMap.get(mro);
-
- js.setAlias(mro);
- js.setConf(job.getJobConf());
- return js;
- }
-
- @SuppressWarnings("deprecation")
- public JobStats addJobStatsForNative(NativeMapReduceOper mr) {
- JobStats js = mroJobMap.get(mr);
- js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber()));
- js.setAlias(mr);
- return js;
- }
-
- void display() {
- if (returnCode == ReturnCode.UNKNOWN) {
- LOG.warn("unknown return code, can't display the results");
- return;
- }
- if (pigContext == null) {
- LOG.warn("unknown exec type, don't display the results");
- return;
+ public Iterator<JobStats> iterator() {
+ return new Iterator<JobStats>() {
+ private Iterator<Operator> iter = getOperators();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ @Override
+ public JobStats next() {
+ return (JobStats)iter.next();
+ }
+ @Override
+ public void remove() {}
+ };
}
- // currently counters are not working in local mode - see PIG-1286
- ExecType execType = pigContext.getExecType();
- if (execType == ExecType.LOCAL) {
- LOG.info("Detected Local mode. Stats reported below may be incomplete");
- }
-
- SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
- StringBuilder sb = new StringBuilder();
- sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
- sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
- .append(userId).append("\t")
- .append(sdf.format(new Date(startTime))).append("\t")
- .append(sdf.format(new Date(endTime))).append("\t")
- .append(getFeatures()).append("\n");
- sb.append("\n");
- if (returnCode == ReturnCode.SUCCESS) {
- sb.append("Success!\n");
- } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
- sb.append("Some jobs have failed! Stop running all dependent jobs\n");
- } else {
- sb.append("Failed!\n");
- }
- sb.append("\n");
-
- if (returnCode == ReturnCode.SUCCESS
- || returnCode == ReturnCode.PARTIAL_FAILURE) {
- sb.append("Job Stats (time in seconds):\n");
- if (execType == ExecType.LOCAL) {
- sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n");
- } else {
- sb.append(JobStats.SUCCESS_HEADER).append("\n");
- }
- List<JobStats> arr = jobPlan.getSuccessfulJobs();
- for (JobStats js : arr) {
- sb.append(js.getDisplayString(execType == ExecType.LOCAL));
- }
- sb.append("\n");
- }
- if (returnCode == ReturnCode.FAILURE
- || returnCode == ReturnCode.PARTIAL_FAILURE) {
- sb.append("Failed Jobs:\n");
- sb.append(JobStats.FAILURE_HEADER).append("\n");
- List<JobStats> arr = jobPlan.getFailedJobs();
- for (JobStats js : arr) {
- sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+ boolean isConnected(Operator from, Operator to) {
+ List<Operator> succs = null;
+ succs = getSuccessors(from);
+ if (succs != null) {
+ for (Operator succ: succs) {
+ if (succ.getName().equals(to.getName())
+ || isConnected(succ, to)) {
+ return true;
+ }
+ }
}
- sb.append("\n");
- }
- sb.append("Input(s):\n");
- for (InputStats is : getInputStats()) {
- sb.append(is.getDisplayString(execType == ExecType.LOCAL));
- }
- sb.append("\n");
- sb.append("Output(s):\n");
- for (OutputStats ds : getOutputStats()) {
- sb.append(ds.getDisplayString(execType == ExecType.LOCAL));
- }
-
- if (execType != ExecType.LOCAL) {
- sb.append("\nCounters:\n");
- sb.append("Total records written : " + getRecordWritten()).append("\n");
- sb.append("Total bytes written : " + getBytesWritten()).append("\n");
- sb.append("Spillable Memory Manager spill count : "
- + getSMMSpillCount()).append("\n");
- sb.append("Total bags proactively spilled: "
- + getProactiveSpillCountObjects()).append("\n");
- sb.append("Total records proactively spilled: "
- + getProactiveSpillCountRecords()).append("\n");
+ return false;
}
- sb.append("\nJob DAG:\n").append(jobPlan.toString());
-
- LOG.info("Script Statistics: \n" + sb.toString());
- }
-
- @SuppressWarnings("deprecation")
- void mapMROperToJob(MapReduceOper mro, Job job) {
- if (mro == null) {
- LOG.warn("null MR operator");
- } else {
- JobStats js = mroJobMap.get(mro);
- if (js == null) {
- LOG.warn("null job stats for mro: " + mro.getOperatorKey());
- } else {
- JobID id = job.getAssignedJobID();
- js.setId(id);
- if (id != null) {
- jobMroMap.put(id.toString(), mro);
- } else {
- jobMroMap.put(job.toString(), mro);
+ List<JobStats> getSuccessfulJobs() {
+ ArrayList<JobStats> lst = new ArrayList<JobStats>();
+ Iterator<JobStats> iter = iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ if (js.getState() == JobState.SUCCESS) {
+ lst.add(js);
}
}
- }
- }
-
- void setErrorMessage(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- void setErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
-
- void setBackendException(Job job, Exception e) {
- if (e instanceof PigException) {
- LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": "
- + e.getLocalizedMessage());
- } else if (e != null) {
- LOG.error("ERROR: " + e.getLocalizedMessage());
+ Collections.sort(lst, new JobComparator());
+ return lst;
}
- if (job.getAssignedJobID() == null || e == null) {
- LOG.debug("unable to set backend exception");
- return;
- }
- String id = job.getAssignedJobID().toString();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- JobStats js = iter.next();
- if (id.equals(js.getJobId())) {
- js.setBackendException(e);
- break;
- }
+ List<JobStats> getFailedJobs() {
+ ArrayList<JobStats> lst = new ArrayList<JobStats>();
+ Iterator<JobStats> iter = iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ if (js.getState() == JobState.FAILED) {
+ lst.add(js);
+ }
+ }
+ return lst;
}
- }
-
- PigContext getPigContext() {
- return pigContext;
- }
+ }
- int getNumberSuccessfulJobs() {
- Iterator<JobStats> iter = jobPlan.iterator();
- int count = 0;
- while (iter.hasNext()) {
- if (iter.next().getState() == JobState.SUCCESS) count++;
- }
- return count;
- }
+ private static class JobComparator implements Comparator<JobStats> {
+ @Override
+ public int compare(JobStats o1, JobStats o2) {
+ return o1.getJobId().compareTo(o2.getJobId());
+ }
+ }
- int getNumberFailedJobs() {
- Iterator<JobStats> iter = jobPlan.iterator();
- int count = 0;
- while (iter.hasNext()) {
- if (iter.next().getState() == JobState.FAILED) count++;
- }
- return count;
+ void setReturnCode(int returnCode) {
+ this.returnCode = returnCode;
}
-
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Jan 7 22:16:22 2011
@@ -19,15 +19,13 @@
package org.apache.pig.tools.pigstats;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
@@ -162,7 +160,7 @@ public abstract class PigStatsUtil {
*/
public static void startCollection(PigContext pc, JobClient client,
JobControlCompiler jcc, MROperPlan plan) {
- PigStats ps = PigStats.start();
+ SimplePigStats ps = (SimplePigStats)PigStats.start();
ps.start(pc, client, jcc, plan);
ScriptState.get().emitLaunchStartedNotification(plan.size());
@@ -175,7 +173,7 @@ public abstract class PigStatsUtil {
* file at INFO level
*/
public static void stopCollection(boolean display) {
- PigStats ps = PigStats.get();
+ SimplePigStats ps = (SimplePigStats)PigStats.get();
ps.stop();
if (!ps.isSuccessful()) {
LOG.error(ps.getNumberFailedJobs() + " map reduce job(s) failed!");
@@ -214,7 +212,7 @@ public abstract class PigStatsUtil {
* Logs the statistics in the Pig log file at INFO level
*/
public static void displayStatistics() {
- PigStats.get().display();
+ ((SimplePigStats)PigStats.get()).display();
}
/**
@@ -226,7 +224,7 @@ public abstract class PigStatsUtil {
* @param jobMroMap the map that maps {@link Job}s to {@link MapReduceOper}s
*/
public static void updateJobMroMap(Map<Job, MapReduceOper> jobMroMap) {
- PigStats ps = PigStats.get();
+ SimplePigStats ps = (SimplePigStats)PigStats.get();
for (Map.Entry<Job, MapReduceOper> entry : jobMroMap.entrySet()) {
MapReduceOper mro = entry.getValue();
ps.mapMROperToJob(mro, entry.getKey());
@@ -239,7 +237,7 @@ public abstract class PigStatsUtil {
* @param jc the job control
*/
public static void accumulateStats(JobControl jc) {
- PigStats ps = PigStats.get();
+ SimplePigStats ps = (SimplePigStats)PigStats.get();
ScriptState ss = ScriptState.get();
for (Job job : jc.getSuccessfulJobs()) {
@@ -269,7 +267,7 @@ public abstract class PigStatsUtil {
}
public static void setBackendException(Job job, Exception e) {
- PigStats.get().setBackendException(job, e);
+ ((SimplePigStats)PigStats.get()).setBackendException(job, e);
}
private static Pattern pattern = Pattern.compile("tmp(-)?[\\d]{1,10}$");
@@ -279,7 +277,7 @@ public abstract class PigStatsUtil {
return result.find();
}
- private static JobStats addFailedJobStats(PigStats ps, Job job) {
+ private static JobStats addFailedJobStats(SimplePigStats ps, Job job) {
JobStats js = ps.addJobStats(job);
if (js == null) {
LOG.warn("unable to add failed job stats");
@@ -296,9 +294,17 @@ public abstract class PigStatsUtil {
return addNativeJobStats(ps, mr, success, null);
}
+ public static void setStatsMap(Map<String, List<PigStats>> statsMap) {
+ EmbeddedPigStats stats = new EmbeddedPigStats(statsMap);
+ PigStats.set(stats);
+ }
+
public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
boolean success, Exception e) {
- JobStats js = ps.addJobStatsForNative(mr);
+ if (ps.isEmbedded()) {
+ throw new IllegalArgumentException();
+ }
+ JobStats js = ((SimplePigStats)ps).addJobStatsForNative(mr);
if(js == null) {
LOG.warn("unable to add native job stats");
} else {
@@ -309,7 +315,7 @@ public abstract class PigStatsUtil {
return js;
}
- private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
+ private static JobStats accumulateSuccessStatistics(SimplePigStats ps, Job job) {
JobStats js = ps.addJobStats(job);
if (js == null) {
LOG.warn("unable to add job stats");
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Jan 7 22:16:22 2011
@@ -65,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LODistinct;
@@ -177,15 +178,18 @@ public class ScriptState {
private long scriptFeatures;
+ private PigContext pigContext;
+
private Map<MapReduceOper, String> featureMap = null;
private Map<MapReduceOper, String> aliasMap = null;
private List<PigProgressNotificationListener> listeners
= new ArrayList<PigProgressNotificationListener>();
- public static ScriptState start(String commandLine) {
+ public static ScriptState start(String commandLine, PigContext pigContext) {
ScriptState ss = new ScriptState(UUID.randomUUID().toString());
ss.setCommandLine(commandLine);
+ ss.setPigContext(pigContext);
tss.set(ss);
return ss;
}
@@ -197,7 +201,7 @@ public class ScriptState {
public static ScriptState get() {
if (tss.get() == null) {
- ScriptState.start("");
+ ScriptState.start("", null);
}
return tss.get();
}
@@ -205,52 +209,56 @@ public class ScriptState {
public void registerListener(PigProgressNotificationListener listener) {
listeners.add(listener);
}
+
+ public List<PigProgressNotificationListener> getAllListeners() {
+ return listeners;
+ }
public void emitLaunchStartedNotification(int numJobsToLaunch) {
for (PigProgressNotificationListener listener: listeners) {
- listener.launchStartedNotification(numJobsToLaunch);
+ listener.launchStartedNotification(id, numJobsToLaunch);
}
}
- public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+ public void emitJobsSubmittedNotification(int numJobsSubmitted) {
for (PigProgressNotificationListener listener: listeners) {
- listener.jobsSubmittedNotification(numJobsSubmitted);
+ listener.jobsSubmittedNotification(id, numJobsSubmitted);
}
}
public void emitJobStartedNotification(String assignedJobId) {
for (PigProgressNotificationListener listener: listeners) {
- listener.jobStartedNotification(assignedJobId);
+ listener.jobStartedNotification(id, assignedJobId);
}
}
public void emitjobFinishedNotification(JobStats jobStats) {
for (PigProgressNotificationListener listener: listeners) {
- listener.jobFinishedNotification(jobStats);
+ listener.jobFinishedNotification(id, jobStats);
}
}
public void emitJobFailedNotification(JobStats jobStats) {
for (PigProgressNotificationListener listener: listeners) {
- listener.jobFailedNotification(jobStats);
+ listener.jobFailedNotification(id, jobStats);
}
}
public void emitOutputCompletedNotification(OutputStats outputStats) {
for (PigProgressNotificationListener listener: listeners) {
- listener.outputCompletedNotification(outputStats);
+ listener.outputCompletedNotification(id, outputStats);
}
}
public void emitProgressUpdatedNotification(int progress) {
for (PigProgressNotificationListener listener: listeners) {
- listener.progressUpdatedNotification(progress);
+ listener.progressUpdatedNotification(id, progress);
}
}
public void emitLaunchCompletedNotification(int numJobsSucceeded) {
for (PigProgressNotificationListener listener: listeners) {
- listener.launchCompletedNotification(numJobsSucceeded);
+ listener.launchCompletedNotification(id, numJobsSucceeded);
}
}
@@ -536,6 +544,14 @@ public class ScriptState {
return sb.toString();
}
+ public void setPigContext(PigContext pigContext) {
+ this.pigContext = pigContext;
+ }
+
+ public PigContext getPigContext() {
+ return pigContext;
+ }
+
private static class FeatureVisitor extends PhyPlanVisitor {
private BitSet feature;
@@ -588,7 +604,7 @@ public class ScriptState {
}
}
- public static class LogicalPlanFeatureVisitor extends LOVisitor {
+ static class LogicalPlanFeatureVisitor extends LOVisitor {
private BitSet feature;
Added: pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java?rev=1056536&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java Fri Jan 7 22:16:22 2011
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats.JobState;
+
+/**
+ * SimplePigStats encapsulates the statistics collected from a running script.
+ * It includes status of the execution, the DAG of its MR jobs, as well as
+ * information about outputs and inputs of the script.
+ */
+final class SimplePigStats extends PigStats {
+
+ private static final Log LOG = LogFactory.getLog(SimplePigStats.class);
+
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ private PigContext pigContext;
+
+ private JobClient jobClient;
+
+ private JobControlCompiler jcc;
+
+ private JobGraph jobPlan;
+
+ // map MR job id to MapReduceOper
+ private Map<String, MapReduceOper> jobMroMap;
+
+ private Map<MapReduceOper, JobStats> mroJobMap;
+
+ private Map<String, OutputStats> aliasOuputMap;
+
+ private long startTime = -1;
+ private long endTime = -1;
+
+ private String userId;
+
+ /**
+ * This class builds the job DAG from a MR plan
+ */
+ private class JobGraphBuilder extends MROpPlanVisitor {
+
+ public JobGraphBuilder(MROperPlan plan) {
+ super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
+ plan));
+ jobPlan = new JobGraph();
+ mroJobMap = new HashMap<MapReduceOper, JobStats>();
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ JobStats js = new JobStats(
+ mr.getOperatorKey().toString(), jobPlan);
+ jobPlan.add(js);
+ List<MapReduceOper> preds = getPlan().getPredecessors(mr);
+ if (preds != null) {
+ for (MapReduceOper pred : preds) {
+ JobStats jpred = mroJobMap.get(pred);
+ if (!jobPlan.isConnected(jpred, js)) {
+ jobPlan.connect(jpred, js);
+ }
+ }
+ }
+ mroJobMap.put(mr, js);
+ }
+ }
+
+ /**
+ * This class prints a JobGraph
+ */
+ static class JobGraphPrinter extends PlanVisitor {
+
+ StringBuffer buf;
+
+ protected JobGraphPrinter(OperatorPlan plan) {
+ super(plan,
+ new org.apache.pig.newplan.DependencyOrderWalker(
+ plan));
+ buf = new StringBuffer();
+ }
+
+ public void visit(JobStats op) throws FrontendException {
+ buf.append(op.getJobId());
+ List<Operator> succs = plan.getSuccessors(op);
+ if (succs != null) {
+ buf.append("\t->\t");
+ for (Operator p : succs) {
+ buf.append(((JobStats)p).getJobId()).append(",");
+ }
+ }
+ buf.append("\n");
+ }
+
+ @Override
+ public String toString() {
+ buf.append("\n");
+ return buf.toString();
+ }
+ }
+
+ @Override
+ public List<String> getAllErrorMessages() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, List<PigStats>> getAllStats() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmbedded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSuccessful() {
+ return (returnCode == ReturnCode.SUCCESS);
+ }
+
+ @Override
+ public Properties getPigProperties() {
+ if (pigContext == null) return null;
+ return pigContext.getProperties();
+ }
+
+ @Override
+ public JobGraph getJobGraph() {
+ return jobPlan;
+ }
+
+ @Override
+ public List<String> getOutputLocations() {
+ ArrayList<String> locations = new ArrayList<String>();
+ for (OutputStats output : getOutputStats()) {
+ locations.add(output.getLocation());
+ }
+ return Collections.unmodifiableList(locations);
+ }
+
+ @Override
+ public List<String> getOutputNames() {
+ ArrayList<String> names = new ArrayList<String>();
+ for (OutputStats output : getOutputStats()) {
+ names.add(output.getName());
+ }
+ return Collections.unmodifiableList(names);
+ }
+
+ @Override
+ public long getNumberBytes(String location) {
+ if (location == null) return -1;
+ String name = new Path(location).getName();
+ long count = -1;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ count = output.getBytes();
+ break;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public long getNumberRecords(String location) {
+ if (location == null) return -1;
+ String name = new Path(location).getName();
+ long count = -1;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ count = output.getNumberRecords();
+ break;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public String getOutputAlias(String location) {
+ if (location == null) return null;
+ String name = new Path(location).getName();
+ String alias = null;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ alias = output.getAlias();
+ break;
+ }
+ }
+ return alias;
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ ret += it.next().getSMMSpillCount();
+ }
+ return ret;
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ ret += it.next().getProactiveSpillCountObjects();
+ }
+ return ret;
+ }
+
+ @Override
+ public long getProactiveSpillCountRecords() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ ret += it.next().getProactiveSpillCountRecs();
+ }
+ return ret;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ long n = it.next().getBytesWritten();
+ if (n > 0) ret += n;
+ }
+ return ret;
+ }
+
+ @Override
+ public long getRecordWritten() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ long n = it.next().getRecordWrittern();
+ if (n > 0) ret += n;
+ }
+ return ret;
+ }
+
+ @Override
+ public String getScriptId() {
+ return ScriptState.get().getId();
+ }
+
+ @Override
+ public String getFeatures() {
+ return ScriptState.get().getScriptFeatures();
+ }
+
+ @Override
+ public long getDuration() {
+ return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
+ }
+
+ @Override
+ public int getNumberJobs() {
+ return jobPlan.size();
+ }
+
+ @Override
+ public List<OutputStats> getOutputStats() {
+ List<OutputStats> outputs = new ArrayList<OutputStats>();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (OutputStats os : iter.next().getOutputs()) {
+ outputs.add(os);
+ }
+ }
+ return Collections.unmodifiableList(outputs);
+ }
+
+ @Override
+ public OutputStats result(String alias) {
+ if (aliasOuputMap == null) {
+ aliasOuputMap = new HashMap<String, OutputStats>();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (OutputStats os : iter.next().getOutputs()) {
+ String a = os.getAlias();
+ if (a == null || a.length() == 0) {
+ LOG.warn("Output alias isn't avalable for " + os.getLocation());
+ continue;
+ }
+ aliasOuputMap.put(a, os);
+ }
+ }
+ }
+ return aliasOuputMap.get(alias);
+ }
+
+ @Override
+ public List<InputStats> getInputStats() {
+ List<InputStats> inputs = new ArrayList<InputStats>();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (InputStats is : iter.next().getInputs()) {
+ inputs.add(is);
+ }
+ }
+ return Collections.unmodifiableList(inputs);
+ }
+
+ SimplePigStats() {
+ jobMroMap = new HashMap<String, MapReduceOper>();
+ jobPlan = new JobGraph();
+ }
+
+ void start(PigContext pigContext, JobClient jobClient,
+ JobControlCompiler jcc, MROperPlan mrPlan) {
+
+ if (pigContext == null || jobClient == null || jcc == null) {
+ LOG.warn("invalid params: " + pigContext + jobClient + jcc);
+ return;
+ }
+
+ this.pigContext = pigContext;
+ this.jobClient = jobClient;
+ this.jcc = jcc;
+
+ // build job DAG with job ids assigned to null
+ try {
+ new JobGraphBuilder(mrPlan).visit();
+ } catch (VisitorException e) {
+ LOG.warn("unable to build job plan", e);
+ }
+
+ startTime = System.currentTimeMillis();
+ userId = System.getProperty("user.name");
+ }
+
+ void stop() {
+ endTime = System.currentTimeMillis();
+ int m = getNumberSuccessfulJobs();
+ int n = getNumberFailedJobs();
+
+ if (n == 0 && m > 0 && m == jobPlan.size()) {
+ returnCode = ReturnCode.SUCCESS;
+ } else if (m > 0 && m < jobPlan.size()) {
+ returnCode = ReturnCode.PARTIAL_FAILURE;
+ } else {
+ returnCode = ReturnCode.FAILURE;
+ }
+ }
+
+ boolean isInitialized() {
+ return startTime > 0;
+ }
+
+ JobClient getJobClient() {
+ return jobClient;
+ }
+
+ JobControlCompiler getJobControlCompiler() {
+ return jcc;
+ }
+
+ @SuppressWarnings("deprecation")
+ JobStats addJobStats(Job job) {
+ MapReduceOper mro = null;
+ JobID jobId = job.getAssignedJobID();
+ if (jobId != null) {
+ mro = jobMroMap.get(jobId.toString());
+ } else {
+ mro = jobMroMap.get(job.toString());
+ }
+ if (mro == null) {
+ LOG.warn("unable to get MR oper for job: "
+ + ((jobId == null) ? job.toString() : jobId.toString()));
+ return null;
+ }
+ JobStats js = mroJobMap.get(mro);
+
+ js.setAlias(mro);
+ js.setConf(job.getJobConf());
+ return js;
+ }
+
+ @SuppressWarnings("deprecation")
+ public JobStats addJobStatsForNative(NativeMapReduceOper mr) {
+ JobStats js = mroJobMap.get(mr);
+ js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber()));
+ js.setAlias(mr);
+
+ return js;
+ }
+
+ void display() {
+ if (returnCode == ReturnCode.UNKNOWN) {
+ LOG.warn("unknown return code, can't display the results");
+ return;
+ }
+ if (pigContext == null) {
+ LOG.warn("unknown exec type, don't display the results");
+ return;
+ }
+
+ // currently counters are not working in local mode - see PIG-1286
+ ExecType execType = pigContext.getExecType();
+ if (execType == ExecType.LOCAL) {
+ LOG.info("Detected Local mode. Stats reported below may be incomplete");
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
+ sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
+ .append(userId).append("\t")
+ .append(sdf.format(new Date(startTime))).append("\t")
+ .append(sdf.format(new Date(endTime))).append("\t")
+ .append(getFeatures()).append("\n");
+ sb.append("\n");
+ if (returnCode == ReturnCode.SUCCESS) {
+ sb.append("Success!\n");
+ } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
+ sb.append("Some jobs have failed! Stop running all dependent jobs\n");
+ } else {
+ sb.append("Failed!\n");
+ }
+ sb.append("\n");
+
+ if (returnCode == ReturnCode.SUCCESS
+ || returnCode == ReturnCode.PARTIAL_FAILURE) {
+ sb.append("Job Stats (time in seconds):\n");
+ if (execType == ExecType.LOCAL) {
+ sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n");
+ } else {
+ sb.append(JobStats.SUCCESS_HEADER).append("\n");
+ }
+ List<JobStats> arr = jobPlan.getSuccessfulJobs();
+ for (JobStats js : arr) {
+ sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+ }
+ sb.append("\n");
+ }
+ if (returnCode == ReturnCode.FAILURE
+ || returnCode == ReturnCode.PARTIAL_FAILURE) {
+ sb.append("Failed Jobs:\n");
+ sb.append(JobStats.FAILURE_HEADER).append("\n");
+ List<JobStats> arr = jobPlan.getFailedJobs();
+ for (JobStats js : arr) {
+ sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+ }
+ sb.append("\n");
+ }
+ sb.append("Input(s):\n");
+ for (InputStats is : getInputStats()) {
+ sb.append(is.getDisplayString(execType == ExecType.LOCAL));
+ }
+ sb.append("\n");
+ sb.append("Output(s):\n");
+ for (OutputStats ds : getOutputStats()) {
+ sb.append(ds.getDisplayString(execType == ExecType.LOCAL));
+ }
+
+ if (execType != ExecType.LOCAL) {
+ sb.append("\nCounters:\n");
+ sb.append("Total records written : " + getRecordWritten()).append("\n");
+ sb.append("Total bytes written : " + getBytesWritten()).append("\n");
+ sb.append("Spillable Memory Manager spill count : "
+ + getSMMSpillCount()).append("\n");
+ sb.append("Total bags proactively spilled: "
+ + getProactiveSpillCountObjects()).append("\n");
+ sb.append("Total records proactively spilled: "
+ + getProactiveSpillCountRecords()).append("\n");
+ }
+
+ sb.append("\nJob DAG:\n").append(jobPlan.toString());
+
+ LOG.info("Script Statistics: \n" + sb.toString());
+ }
+
+ @SuppressWarnings("deprecation")
+ void mapMROperToJob(MapReduceOper mro, Job job) {
+ if (mro == null) {
+ LOG.warn("null MR operator");
+ } else {
+ JobStats js = mroJobMap.get(mro);
+ if (js == null) {
+ LOG.warn("null job stats for mro: " + mro.getOperatorKey());
+ } else {
+ JobID id = job.getAssignedJobID();
+ js.setId(id);
+ if (id != null) {
+ jobMroMap.put(id.toString(), mro);
+ } else {
+ jobMroMap.put(job.toString(), mro);
+ }
+ }
+ }
+ }
+
+ void setBackendException(Job job, Exception e) {
+ if (e instanceof PigException) {
+ LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": "
+ + e.getLocalizedMessage());
+ } else if (e != null) {
+ LOG.error("ERROR: " + e.getLocalizedMessage());
+ }
+
+ if (job.getAssignedJobID() == null || e == null) {
+ LOG.debug("unable to set backend exception");
+ return;
+ }
+ String id = job.getAssignedJobID().toString();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ if (id.equals(js.getJobId())) {
+ js.setBackendException(e);
+ break;
+ }
+ }
+ }
+
+ PigContext getPigContext() {
+ return pigContext;
+ }
+
+ int getNumberSuccessfulJobs() {
+ Iterator<JobStats> iter = jobPlan.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ if (iter.next().getState() == JobState.SUCCESS) count++;
+ }
+ return count;
+ }
+
+ int getNumberFailedJobs() {
+ Iterator<JobStats> iter = jobPlan.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ if (iter.next().getState() == JobState.FAILED) count++;
+ }
+ return count;
+ }
+
+}
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Fri Jan 7 22:16:22 2011
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import junit.framework.Assert;
@@ -507,60 +509,68 @@ public class TestPigRunner {
}
}
- private static class TestNotificationListener implements PigProgressNotificationListener {
+ public static class TestNotificationListener implements PigProgressNotificationListener {
- private int numJobsToLaunch = 0;
- private int numJobsSubmitted = 0;
- private int numJobStarted = 0;
- private int numJobFinished = 0;
+ private Map<String, int[]> numMap = new HashMap<String, int[]>();
+
+ private static final int JobsToLaunch = 0;
+ private static final int JobsSubmitted = 1;
+ private static final int JobStarted = 2;
+ private static final int JobFinished = 3;
@Override
- public void launchStartedNotification(int numJobsToLaunch) {
- System.out.println("++++ numJobsToLaunch: " + numJobsToLaunch);
- this.numJobsToLaunch = numJobsToLaunch;
+ public void launchStartedNotification(String id, int numJobsToLaunch) {
+ System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);
+ int[] nums = new int[4];
+ numMap.put(id, nums);
+ nums[JobsToLaunch] = numJobsToLaunch;
}
@Override
- public void jobFailedNotification(JobStats jobStats) {
- System.out.println("++++ job failed: " + jobStats.getJobId());
+ public void jobFailedNotification(String id, JobStats jobStats) {
+ System.out.println("id: " + id + " job failed: " + jobStats.getJobId());
}
@Override
- public void jobFinishedNotification(JobStats jobStats) {
- System.out.println("++++ job finished: " + jobStats.getJobId());
- numJobFinished++;
+ public void jobFinishedNotification(String id, JobStats jobStats) {
+ System.out.println("id: " + id + " job finished: " + jobStats.getJobId());
+ int[] nums = numMap.get(id);
+ nums[JobFinished]++;
}
@Override
- public void jobStartedNotification(String assignedJobId) {
- System.out.println("++++ job started: " + assignedJobId);
- numJobStarted++;
+ public void jobStartedNotification(String id, String assignedJobId) {
+ System.out.println("id: " + id + " job started: " + assignedJobId);
+ int[] nums = numMap.get(id);
+ nums[JobStarted]++;
}
@Override
- public void jobsSubmittedNotification(int numJobsSubmitted) {
- System.out.println("++++ jobs submitted: " + numJobsSubmitted);
- this.numJobsSubmitted += numJobsSubmitted;
+ public void jobsSubmittedNotification(String id, int numJobsSubmitted) {
+ System.out.println("id: " + id + " jobs submitted: " + numJobsSubmitted);
+ int[] nums = numMap.get(id);
+ nums[JobsSubmitted] += numJobsSubmitted;
}
@Override
- public void launchCompletedNotification(int numJobsSucceeded) {
- System.out.println("++++ numJobsSucceeded: " + numJobsSucceeded);
+ public void launchCompletedNotification(String id, int numJobsSucceeded) {
+ System.out.println("id: " + id + " numJobsSucceeded: " + numJobsSucceeded);
System.out.println("");
- assertEquals(this.numJobsToLaunch, numJobsSucceeded);
- assertEquals(this.numJobsSubmitted, numJobsSucceeded);
- assertEquals(this.numJobStarted, numJobsSucceeded);
- assertEquals(this.numJobFinished, numJobsSucceeded);
+ int[] nums = numMap.get(id);
+ assertEquals(nums[JobsToLaunch], numJobsSucceeded);
+ assertEquals(nums[JobsSubmitted], numJobsSucceeded);
+ assertEquals(nums[JobStarted], numJobsSucceeded);
+ assertEquals(nums[JobFinished], numJobsSucceeded);
}
@Override
- public void outputCompletedNotification(OutputStats outputStats) {
- System.out.println("++++ output done: " + outputStats.getLocation());
+ public void outputCompletedNotification(String id, OutputStats outputStats) {
+ System.out.println("id: " + id + " output done: " + outputStats.getLocation());
}
@Override
- public void progressUpdatedNotification(int progress) {
- System.out.println("++++ progress: " + progress + "%");
+ public void progressUpdatedNotification(String id, int progress) {
+ System.out.println("id: " + id + " progress: " + progress + "%");
}
}
Added: pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java?rev=1056536&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java Fri Jan 7 22:16:22 2011
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestScriptLanguage {
+
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Util.deleteFile(cluster, "simple_out");
+ }
+
+ @Test
+ public void firstTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "input = 'simple_table'",
+ "output = 'simple_out'",
+ "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "Q = P.bind({'input':input, 'output':output})",
+ "stats = Q.runSingle()",
+ "if stats.isSuccessful():",
+ "\tprint 'success!'",
+ "else:",
+ "\traise 'failed'"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+ assertEquals(1, statsMap.size());
+ Iterator<List<PigStats>> it = statsMap.values().iterator();
+ PigStats stats = it.next().get(0);
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ String name = stats.getOutputNames().get(0);
+ assertEquals("simple_out", name);
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+
+ @Test
+ public void secondTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "input = 'simple_table_6'",
+ "output = 'simple_out'",
+ "P = Pig.compileFromFile(\"\"\"testScript.pig\"\"\")",
+ "Q = P.bind({'input':input, 'output':output})",
+ "stats = Q.runSingle()",
+ "if stats.isSuccessful():",
+ "\tprint 'success!'",
+ "else:",
+ "\traise 'failed'"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ String[] pigLatin = {
+ "a = load '$input';",
+ "store a into '$output';"
+ };
+
+ Util.createInputFile(cluster, "simple_table_6", input);
+ Util.createLocalInputFile( "testScript.py", script);
+ Util.createLocalInputFile( "testScript.pig", pigLatin);
+
+ ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+ assertEquals(1, statsMap.size());
+ Iterator<List<PigStats>> it = statsMap.values().iterator();
+ PigStats stats = it.next().get(0);
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ String name = stats.getOutputNames().get(0);
+ assertEquals("simple_out", name);
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+
+ @Test
+ public void firstParallelTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "Pig.fs(\"-rmr simple_out2\")",
+ "input = 'simple_table_1'",
+ "output1 = 'simple_out'",
+ "output2 = 'simple_out'",
+ "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
+ "stats = Q.run()"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table_1", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+ assertEquals(1, statsMap.size());
+ assertEquals("mypipeline", statsMap.keySet().iterator().next());
+ List<PigStats> lst = statsMap.get("mypipeline");
+ assertEquals(2, lst.size());
+ for (PigStats stats : lst) {
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+ }
+
+ @Test
+ public void pigRunnerTest() throws Exception {
+ String[] script = {
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "input = 'simple_table_2'",
+ "output = 'simple_out'",
+ "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "Q = P.bind({'input':input, 'output':output})",
+ "stats = Q.runSingle()",
+ "if stats.isSuccessful():",
+ "\tprint 'success!'",
+ "else:",
+ "\traise 'failed'"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table_2", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ String[] args = { "-g", "jython", "testScript.py" };
+
+ PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+ assertTrue(mainStats.isEmbedded());
+ assertTrue(mainStats.isSuccessful());
+ Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+ assertEquals(1, statsMap.size());
+ Iterator<List<PigStats>> it = statsMap.values().iterator();
+ PigStats stats = it.next().get(0);
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ String name = stats.getOutputNames().get(0);
+ assertEquals("simple_out", name);
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+
+ @Test
+ public void runParallelTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "input = 'simple_table_3'",
+ "Pig.fs(\"-rmr simple_out\")",
+ "Pig.fs(\"-rmr simple_out2\")",
+ "output1 = 'simple_out'",
+ "output2 = 'simple_out2'",
+ "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
+ "stats = Q.run()"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table_3", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ String[] args = { "-g", "jython", "testScript.py" };
+ PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+ assertTrue(mainStats.isEmbedded());
+ assertTrue(mainStats.isSuccessful());
+ Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+ assertEquals(1, statsMap.size());
+ assertEquals("mypipeline", statsMap.keySet().iterator().next());
+ List<PigStats> lst = statsMap.get("mypipeline");
+ assertEquals(2, lst.size());
+ for (PigStats stats : lst) {
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+ }
+
+ @Test
+ public void runLoopTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "Pig.fs(\"-rmr simple_out2\")",
+ "input = 'simple_table_4'",
+ "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "for x in [\"simple_out\", \"simple_out2\"]:",
+ "\tQ = P.bind({'input':input, 'output':x}).run()"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table_4", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ String[] args = { "-g", "jython", "testScript.py" };
+ PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+ assertTrue(mainStats.isEmbedded());
+ assertTrue(mainStats.isSuccessful());
+ Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+ assertEquals(1, statsMap.size());
+ assertEquals("mypipeline", statsMap.keySet().iterator().next());
+ List<PigStats> lst = statsMap.get("mypipeline");
+ assertEquals(2, lst.size());
+ for (PigStats stats : lst) {
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+ }
+
+ @Test
+ public void bindLocalVariableTest() throws Exception {
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "Pig.fs(\"-rmr simple_out\")",
+ "input = 'simple_table_5'",
+ "output = 'simple_out'",
+ "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+ "Q = P.bind()",
+ "stats = Q.runSingle()",
+ "if stats.isSuccessful():",
+ "\tprint 'success!'",
+ "else:",
+ "\traise 'failed'"
+ };
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "simple_table_5", input);
+ Util.createLocalInputFile( "testScript.py", script);
+
+ ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+ assertEquals(1, statsMap.size());
+ Iterator<List<PigStats>> it = statsMap.values().iterator();
+ PigStats stats = it.next().get(0);
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, stats.getNumberJobs());
+ String name = stats.getOutputNames().get(0);
+ assertEquals("simple_out", name);
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+ }
+
+}