You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/11 22:50:50 UTC
svn commit: r1540855 - in /pig/trunk: ./ src/org/apache/pig/tools/pigstats/
src/org/apache/pig/tools/pigstats/mapreduce/ test/org/apache/pig/test/
Author: cheolsoo
Date: Mon Nov 11 21:50:50 2013
New Revision: 1540855
URL: http://svn.apache.org/r1540855
Log:
PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov 11 21:50:50 2013
@@ -46,6 +46,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
+
PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
PIG-3559: Trunk is broken by PIG-3522 (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Mon Nov 11 21:50:50 2013
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -58,6 +59,8 @@ public abstract class JobStats extends O
protected ArrayList<InputStats> inputs;
+ protected Configuration conf;
+
private String errorMsg;
private Exception exception = null;
@@ -70,6 +73,13 @@ public abstract class JobStats extends O
public abstract String getJobId();
+ public void setConf(Configuration conf) {
+ if (conf == null) {
+ return;
+ }
+ this.conf = conf;
+ }
+
public JobState getState() { return state; }
public boolean isSuccessful() { return (state == JobState.SUCCESS); }
@@ -304,4 +314,3 @@ public abstract class JobStats extends O
abstract public Map<String, Long> getMultiStoreCounters();
}
-
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Nov 11 21:50:50 2013
@@ -28,11 +28,14 @@ import java.util.Properties;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.newplan.BaseOperatorPlan;
@@ -42,26 +45,34 @@ import org.apache.pig.newplan.PlanVisito
import org.apache.pig.tools.pigstats.JobStats.JobState;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import com.google.common.collect.Maps;
+
/**
- * PigStats encapsulates the statistics collected from a running script.
- * It includes status of the execution, the DAG of its MR jobs, as well as
- * information about outputs and inputs of the script.
+ * PigStats encapsulates the statistics collected from a running script. It
+ * includes status of the execution, the DAG of its Hadoop jobs, as well as
+ * information about outputs and inputs of the script.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class PigStats {
-
private static final Log LOG = LogFactory.getLog(PigStats.class);
-
private static ThreadLocal<PigStats> tps = new ThreadLocal<PigStats>();
-
+
+ protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ protected long startTime = -1;
+ protected long endTime = -1;
+
+ protected String userId;
+ protected JobGraph jobPlan;
+ protected PigContext pigContext;
+ protected Map<String, OutputStats> aliasOuputMap;
+
+ protected int errorCode = -1;
+ protected String errorMessage = null;
+ protected Throwable errorThrowable = null;
protected int returnCode = ReturnCode.UNKNOWN;
-
- private String errorMessage;
- private int errorCode = -1;
- private Throwable errorThrowable = null;
-
+
public static PigStats get() {
if (tps.get() == null) {
LOG.info("PigStats has not been set. Defaulting to SimplePigStats");
@@ -69,37 +80,37 @@ public abstract class PigStats {
}
return tps.get();
}
-
+
static void set(PigStats stats) {
tps.set(stats);
}
-
+
public static PigStats start(PigStats stats) {
tps.set(stats);
return tps.get();
}
-
+
/**
* Returns code are defined in {@link ReturnCode}
*/
public int getReturnCode() {
return returnCode;
}
-
+
/**
* Returns error message string
*/
public String getErrorMessage() {
return errorMessage;
}
-
+
/**
* Returns the error code of {@link PigException}
*/
public int getErrorCode() {
return errorCode;
}
-
+
/**
* Returns the error code of {@link PigException}
*/
@@ -110,152 +121,269 @@ public abstract class PigStats {
public abstract JobClient getJobClient();
public abstract boolean isEmbedded();
-
- public abstract boolean isSuccessful();
-
+
+ public boolean isSuccessful() {
+ return (getNumberJobs() == 0 && returnCode == ReturnCode.UNKNOWN
+ || returnCode == ReturnCode.SUCCESS);
+ }
+
public abstract Map<String, List<PigStats>> getAllStats();
-
- public abstract List<String> getAllErrorMessages();
-
+
+ public abstract List<String> getAllErrorMessages();
+
/**
* Returns the properties associated with the script
*/
- public abstract Properties getPigProperties();
-
+ public Properties getPigProperties() {
+ if (pigContext == null) {
+ return null;
+ }
+ return pigContext.getProperties();
+ }
+
/**
- * Returns the DAG of the MR jobs spawned by the script
+ * Returns the DAG of jobs spawned by the script
*/
- public abstract JobGraph getJobGraph();
-
+ public JobGraph getJobGraph() {
+ return jobPlan;
+ }
+
/**
* Returns the list of output locations in the script
*/
- public abstract List<String> getOutputLocations();
-
+ public List<String> getOutputLocations() {
+ ArrayList<String> locations = new ArrayList<String>();
+ for (OutputStats output : getOutputStats()) {
+ locations.add(output.getLocation());
+ }
+ return Collections.unmodifiableList(locations);
+ }
+
/**
* Returns the list of output names in the script
*/
- public abstract List<String> getOutputNames();
-
+ public List<String> getOutputNames() {
+ ArrayList<String> names = new ArrayList<String>();
+ for (OutputStats output : getOutputStats()) {
+ names.add(output.getName());
+ }
+ return Collections.unmodifiableList(names);
+ }
+
/**
* Returns the number of bytes for the given output location,
* -1 for invalid location or name.
*/
- public abstract long getNumberBytes(String location);
-
+ public long getNumberBytes(String location) {
+ if (location == null) return -1;
+ String name = new Path(location).getName();
+ long count = -1;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ count = output.getBytes();
+ break;
+ }
+ }
+ return count;
+ }
+
/**
* Returns the number of records for the given output location,
* -1 for invalid location or name.
*/
- public abstract long getNumberRecords(String location);
-
+ public long getNumberRecords(String location) {
+ if (location == null) return -1;
+ String name = new Path(location).getName();
+ long count = -1;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ count = output.getNumberRecords();
+ break;
+ }
+ }
+ return count;
+ }
+
/**
* Returns the alias associated with this output location
*/
- public abstract String getOutputAlias(String location);
-
+ public String getOutputAlias(String location) {
+ if (location == null) {
+ return null;
+ }
+ String name = new Path(location).getName();
+ String alias = null;
+ for (OutputStats output : getOutputStats()) {
+ if (name.equals(output.getName())) {
+ alias = output.getAlias();
+ break;
+ }
+ }
+ return alias;
+ }
+
/**
* Returns the total spill counts from {@link SpillableMemoryManager}.
*/
public abstract long getSMMSpillCount();
-
+
/**
* Returns the total number of bags that spilled proactively
*/
public abstract long getProactiveSpillCountObjects();
-
+
/**
* Returns the total number of records that spilled proactively
*/
public abstract long getProactiveSpillCountRecords();
-
+
/**
* Returns the total bytes written to user specified HDFS
* locations of this script.
*/
- public abstract long getBytesWritten();
-
+ public long getBytesWritten() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ long n = it.next().getBytesWritten();
+ if (n > 0) ret += n;
+ }
+ return ret;
+ }
+
/**
* Returns the total number of records in user specified output
* locations of this script.
*/
- public abstract long getRecordWritten();
+ public long getRecordWritten() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ long n = it.next().getRecordWrittern();
+ if (n > 0) ret += n;
+ }
+ return ret;
+ }
public String getHadoopVersion() {
return ScriptState.get().getHadoopVersion();
}
-
+
public String getPigVersion() {
return ScriptState.get().getPigVersion();
}
-
- public abstract String getScriptId();
-
- public abstract String getFeatures();
-
- public abstract long getDuration();
-
- /**
- * Returns the number of MR jobs for this script
- */
- public abstract int getNumberJobs();
-
- public abstract List<OutputStats> getOutputStats();
-
- public abstract OutputStats result(String alias);
-
- public abstract List<InputStats> getInputStats();
-
- void setErrorMessage(String errorMessage) {
+
+ public String getScriptId() {
+ return ScriptState.get().getId();
+ }
+
+ public String getFeatures() {
+ return ScriptState.get().getScriptFeatures();
+ }
+
+ public long getDuration() {
+ return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
+ }
+
+ /**
+ * Returns the number of jobs for this script
+ */
+ public int getNumberJobs() {
+ return jobPlan.size();
+ }
+
+ public List<OutputStats> getOutputStats() {
+ List<OutputStats> outputs = new ArrayList<OutputStats>();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (OutputStats os : iter.next().getOutputs()) {
+ outputs.add(os);
+ }
+ }
+ return Collections.unmodifiableList(outputs);
+ }
+
+ public OutputStats result(String alias) {
+ if (aliasOuputMap == null) {
+ aliasOuputMap = Maps.newHashMap();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (OutputStats os : iter.next().getOutputs()) {
+ String a = os.getAlias();
+ if (a == null || a.length() == 0) {
+ LOG.warn("Output alias isn't avalable for " + os.getLocation());
+ continue;
+ }
+ aliasOuputMap.put(a, os);
+ }
+ }
+ }
+ return aliasOuputMap.get(alias);
+ }
+
+ public List<InputStats> getInputStats() {
+ List<InputStats> inputs = new ArrayList<InputStats>();
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ for (InputStats is : iter.next().getInputs()) {
+ inputs.add(is);
+ }
+ }
+ return Collections.unmodifiableList(inputs);
+ }
+
+ public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
-
- void setErrorCode(int errorCode) {
+
+ public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
- }
-
- void setErrorThrowable(Throwable t) {
+ }
+
+ public void setErrorThrowable(Throwable t) {
this.errorThrowable = t;
}
-
+
+ public void setReturnCode(int returnCode) {
+ this.returnCode = returnCode;
+ }
+
/**
* This class prints a JobGraph
*/
public static class JobGraphPrinter extends PlanVisitor {
-
+
StringBuffer buf;
protected JobGraphPrinter(OperatorPlan plan) {
- super(plan,
- new org.apache.pig.newplan.DependencyOrderWalker(
- plan));
+ super(plan, new org.apache.pig.newplan.DependencyOrderWalker(plan));
buf = new StringBuffer();
}
-
+
public void visit(JobStats op) throws FrontendException {
buf.append(op.getJobId());
List<Operator> succs = plan.getSuccessors(op);
if (succs != null) {
buf.append("\t->\t");
- for (Operator p : succs) {
+ for (Operator p : succs) {
buf.append(((JobStats)p).getJobId()).append(",");
- }
+ }
}
buf.append("\n");
}
-
+
@Override
public String toString() {
buf.append("\n");
return buf.toString();
- }
+ }
}
-
+
/**
* JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
*/
public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
-
+
@Override
public String toString() {
JobGraphPrinter jp = new JobGraphPrinter(this);
@@ -266,11 +394,11 @@ public abstract class PigStats {
}
return jp.toString();
}
-
+
/**
* Returns a List representation of the Job graph. Returned list is an
* ArrayList
- *
+ *
* @return List<JobStats>
*/
@SuppressWarnings("unchecked")
@@ -280,34 +408,34 @@ public abstract class PigStats {
public Iterator<JobStats> iterator() {
return new Iterator<JobStats>() {
- private Iterator<Operator> iter = getOperators();
+ private Iterator<Operator> iter = getOperators();
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
return iter.hasNext();
}
@Override
- public JobStats next() {
+ public JobStats next() {
return (JobStats)iter.next();
}
@Override
public void remove() {}
};
}
-
+
public boolean isConnected(Operator from, Operator to) {
List<Operator> succs = null;
succs = getSuccessors(from);
if (succs != null) {
for (Operator succ: succs) {
- if (succ.getName().equals(to.getName())
+ if (succ.getName().equals(to.getName())
|| isConnected(succ, to)) {
return true;
- }
+ }
}
}
return false;
}
-
+
public List<JobStats> getSuccessfulJobs() {
ArrayList<JobStats> lst = new ArrayList<JobStats>();
Iterator<JobStats> iter = iterator();
@@ -320,7 +448,7 @@ public abstract class PigStats {
Collections.sort(lst, new JobComparator());
return lst;
}
-
+
public List<JobStats> getFailedJobs() {
ArrayList<JobStats> lst = new ArrayList<JobStats>();
Iterator<JobStats> iter = iterator();
@@ -329,19 +457,79 @@ public abstract class PigStats {
if (js.getState() == JobState.FAILED) {
lst.add(js);
}
- }
+ }
return lst;
}
- }
-
+ }
+
private static class JobComparator implements Comparator<JobStats> {
@Override
- public int compare(JobStats o1, JobStats o2) {
+ public int compare(JobStats o1, JobStats o2) {
return o1.getJobId().compareTo(o2.getJobId());
- }
- }
-
- void setReturnCode(int returnCode) {
- this.returnCode = returnCode;
+ }
+ }
+
+ @Private
+ public void setBackendException(String jobId, Exception e) {
+ if (e instanceof PigException) {
+ LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": "
+ + e.getLocalizedMessage());
+ } else if (e != null) {
+ LOG.error("ERROR: " + e.getLocalizedMessage());
+ }
+
+ if (jobId == null || e == null) {
+ LOG.debug("unable to set backend exception");
+ return;
+ }
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ if (jobId.equals(js.getJobId())) {
+ js.setBackendException(e);
+ break;
+ }
+ }
+ }
+
+ @Private
+ public PigContext getPigContext() {
+ return pigContext;
+ }
+
+ public void start() {
+ startTime = System.currentTimeMillis();
+ userId = System.getProperty("user.name");
+ }
+
+ public void stop() {
+ endTime = System.currentTimeMillis();
+ int failed = getNumberFailedJobs();
+ int succeeded = getNumberSuccessfulJobs();
+ if (failed == 0 && succeeded > 0 && succeeded == jobPlan.size()) {
+ returnCode = ReturnCode.SUCCESS;
+ } else if (succeeded > 0 && succeeded < jobPlan.size()) {
+ returnCode = ReturnCode.PARTIAL_FAILURE;
+ } else {
+ returnCode = ReturnCode.FAILURE;
+ }
+ }
+
+ public int getNumberSuccessfulJobs() {
+ Iterator<JobStats> iter = jobPlan.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ if (iter.next().getState() == JobState.SUCCESS) count++;
+ }
+ return count;
+ }
+
+ public int getNumberFailedJobs() {
+ Iterator<JobStats> iter = jobPlan.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ if (iter.next().getState() == JobState.FAILED) count++;
+ }
+ return count;
}
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Mon Nov 11 21:50:50 2013
@@ -57,14 +57,14 @@ import org.apache.pig.tools.pigstats.Pig
/**
- * This class encapsulates the runtime statistics of a MapReduce job.
+ * This class encapsulates the runtime statistics of a MapReduce job.
* Job statistics is collected when job is completed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class MRJobStats extends JobStats {
-
-
+
+
MRJobStats(String name, JobGraph plan) {
super(name, plan);
}
@@ -72,27 +72,25 @@ public final class MRJobStats extends Jo
public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
"MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
"MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
-
+
public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
-
+
// currently counters are not working in local mode - see PIG-1286
public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs";
-
+
private static final Log LOG = LogFactory.getLog(MRJobStats.class);
-
- private Configuration conf;
-
+
private List<POStore> mapStores = null;
-
+
private List<POStore> reduceStores = null;
-
+
private List<FileSpec> loads = null;
-
+
private Boolean disableCounter = false;
-
+
@SuppressWarnings("deprecation")
private JobID jobId;
-
+
private long maxMapTime = 0;
private long minMapTime = 0;
private long avgMapTime = 0;
@@ -104,7 +102,7 @@ public final class MRJobStats extends Jo
private int numberMaps = 0;
private int numberReduces = 0;
-
+
private long mapInputRecords = 0;
private long mapOutputRecords = 0;
private long reduceInputRecords = 0;
@@ -114,37 +112,37 @@ public final class MRJobStats extends Jo
private long spillCount = 0;
private long activeSpillCountObj = 0;
private long activeSpillCountRecs = 0;
-
- private HashMap<String, Long> multiStoreCounters
+
+ private HashMap<String, Long> multiStoreCounters
= new HashMap<String, Long>();
-
- private HashMap<String, Long> multiInputCounters
+
+ private HashMap<String, Long> multiInputCounters
= new HashMap<String, Long>();
-
+
@SuppressWarnings("deprecation")
private Counters counters = null;
-
- public String getJobId() {
- return (jobId == null) ? null : jobId.toString();
+
+ public String getJobId() {
+ return (jobId == null) ? null : jobId.toString();
}
-
+
public int getNumberMaps() { return numberMaps; }
-
+
public int getNumberReduces() { return numberReduces; }
-
+
public long getMaxMapTime() { return maxMapTime; }
-
+
public long getMinMapTime() { return minMapTime; }
-
+
public long getAvgMapTime() { return avgMapTime; }
-
+
public long getMaxReduceTime() { return maxReduceTime; }
-
+
public long getMinReduceTime() { return minReduceTime; }
-
- public long getAvgREduceTime() { return avgReduceTime; }
-
+
+ public long getAvgREduceTime() { return avgReduceTime; }
+
public long getMapInputRecords() { return mapInputRecords; }
public long getMapOutputRecords() { return mapOutputRecords; }
@@ -154,13 +152,13 @@ public final class MRJobStats extends Jo
public long getReduceInputRecords() { return reduceInputRecords; }
public long getSMMSpillCount() { return spillCount; }
-
+
public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
-
+
public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
-
+
public long getHdfsBytesWritten() { return hdfsBytesWritten; }
-
+
@SuppressWarnings("deprecation")
public Counters getHadoopCounters() { return counters; }
@@ -168,11 +166,11 @@ public final class MRJobStats extends Jo
public Map<String, Long> getMultiStoreCounters() {
return Collections.unmodifiableMap(multiStoreCounters);
}
-
+
public String getAlias() {
return (String)getAnnotation(ALIAS);
}
-
+
public String getAliasLocation() {
return (String)getAnnotation(ALIAS_LOCATION);
}
@@ -180,7 +178,7 @@ public final class MRJobStats extends Jo
public String getFeature() {
return (String)getAnnotation(FEATURE);
}
-
+
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (v instanceof JobGraphPrinter) {
@@ -194,43 +192,43 @@ public final class MRJobStats extends Jo
this.jobId = jobId;
}
+ @Override
@SuppressWarnings("unchecked")
- void setConf(Configuration conf) {
- if (conf == null) return;
- this.conf = conf;
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
try {
this.mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
.get(JobControlCompiler.PIG_MAP_STORES));
this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
- .get(JobControlCompiler.PIG_REDUCE_STORES));
+ .get(JobControlCompiler.PIG_REDUCE_STORES));
this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
.get("pig.inputs"));
this.disableCounter = conf.getBoolean("pig.disable.counter", false);
} catch (IOException e) {
LOG.warn("Failed to deserialize the store list", e);
- }
+ }
}
-
+
void setMapStat(int size, long max, long min, long avg, long median) {
numberMaps = size;
maxMapTime = max;
minMapTime = min;
- avgMapTime = avg;
+ avgMapTime = avg;
medianMapTime = median;
}
-
+
void setReduceStat(int size, long max, long min, long avg, long median) {
numberReduces = size;
maxReduceTime = max;
minReduceTime = min;
- avgReduceTime = avg;
+ avgReduceTime = avg;
medianReduceTime = median;
- }
-
+ }
+
public String getDisplayString(boolean local) {
StringBuilder sb = new StringBuilder();
String id = (jobId == null) ? "N/A" : jobId.toString();
- if (state == JobState.FAILED || local) {
+ if (state == JobState.FAILED || local) {
sb.append(id).append("\t")
.append(getAlias()).append("\t")
.append(getFeature()).append("\t");
@@ -243,7 +241,7 @@ public final class MRJobStats extends Jo
.append(numberReduces).append("\t");
if (numberMaps == 0) {
sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
- } else {
+ } else {
sb.append(maxMapTime/1000).append("\t")
.append(minMapTime/1000).append("\t")
.append(avgMapTime/1000).append("\t")
@@ -262,7 +260,7 @@ public final class MRJobStats extends Jo
}
for (OutputStats os : outputs) {
sb.append(os.getLocation()).append(",");
- }
+ }
sb.append("\n");
return sb.toString();
}
@@ -295,9 +293,9 @@ public final class MRJobStats extends Jo
reduceOutputRecords = taskgroup.getCounterForName(
MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
hdfsBytesRead = hdfsgroup.getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_READ).getCounter();
+ MRPigStatsUtil.HDFS_BYTES_READ).getCounter();
hdfsBytesWritten = hdfsgroup.getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();
spillCount = counters.findCounter(
PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
.getCounter();
@@ -310,23 +308,23 @@ public final class MRJobStats extends Jo
while (iter.hasNext()) {
Counter cter = iter.next();
multiStoreCounters.put(cter.getName(), cter.getValue());
- }
-
+ }
+
Iterator<Counter> iter2 = multiloadgroup.iterator();
while (iter2.hasNext()) {
Counter cter = iter2.next();
multiInputCounters.put(cter.getName(), cter.getValue());
- }
-
- }
+ }
+
+ }
}
-
+
void addMapReduceStatistics(JobClient client, Configuration conf) {
TaskReport[] maps = null;
try {
maps = client.getMapTaskReports(jobId);
} catch (IOException e) {
- LOG.warn("Failed to get map task report", e);
+ LOG.warn("Failed to get map task report", e);
}
if (maps != null && maps.length > 0) {
int size = maps.length;
@@ -335,7 +333,7 @@ public final class MRJobStats extends Jo
long median = 0;
long total = 0;
long durations[] = new long[size];
-
+
for (int i = 0; i < maps.length; i++) {
TaskReport rpt = maps[i];
long duration = rpt.getFinishTime() - rpt.getStartTime();
@@ -345,7 +343,7 @@ public final class MRJobStats extends Jo
total += duration;
}
long avg = total / size;
-
+
median = calculateMedianValue(durations);
setMapStat(size, max, min, avg, median);
} else {
@@ -354,7 +352,7 @@ public final class MRJobStats extends Jo
setMapStat(m, -1, -1, -1, -1);
}
}
-
+
TaskReport[] reduces = null;
try {
reduces = client.getReduceTaskReports(jobId);
@@ -368,7 +366,7 @@ public final class MRJobStats extends Jo
long median = 0;
long total = 0;
long durations[] = new long[size];
-
+
for (int i = 0; i < reduces.length; i++) {
TaskReport rpt = reduces[i];
long duration = rpt.getFinishTime() - rpt.getStartTime();
@@ -387,32 +385,32 @@ public final class MRJobStats extends Jo
}
}
}
-
- void setAlias(MapReduceOper mro) {
+
+ void setAlias(MapReduceOper mro) {
MRScriptState ss = MRScriptState.get();
- annotate(ALIAS, ss.getAlias(mro));
+ annotate(ALIAS, ss.getAlias(mro));
annotate(ALIAS_LOCATION, ss.getAliasLocation(mro));
annotate(FEATURE, ss.getPigFeature(mro));
}
-
+
void addOutputStatistics() {
if (mapStores == null || reduceStores == null) {
LOG.warn("unable to get stores of the job");
return;
}
-
+
if (mapStores.size() + reduceStores.size() == 1) {
POStore sto = (mapStores.size() > 0) ? mapStores.get(0)
: reduceStores.get(0);
if (!sto.isTmpStore()) {
long records = (mapStores.size() > 0) ? mapOutputRecords
- : reduceOutputRecords;
+ : reduceOutputRecords;
OutputStats ds = new OutputStats(sto.getSFile().getFileName(),
hdfsBytesWritten, records, (state == JobState.SUCCESS));
ds.setPOStore(sto);
ds.setConf(conf);
outputs.add(ds);
-
+
if (state == JobState.SUCCESS) {
MRScriptState.get().emitOutputCompletedNotification(ds);
}
@@ -425,10 +423,10 @@ public final class MRJobStats extends Jo
for (POStore sto : reduceStores) {
if (sto.isTmpStore()) continue;
addOneOutputStats(sto);
- }
+ }
}
}
-
+
/**
* Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
* it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
@@ -474,32 +472,32 @@ public final class MRJobStats extends Jo
long bytes = getOutputSize(sto, conf);
String location = sto.getSFile().getFileName();
OutputStats ds = new OutputStats(location, bytes, records,
- (state == JobState.SUCCESS));
+ (state == JobState.SUCCESS));
ds.setPOStore(sto);
ds.setConf(conf);
outputs.add(ds);
-
+
if (state == JobState.SUCCESS) {
MRScriptState.get().emitOutputCompletedNotification(ds);
}
}
-
+
void addInputStatistics() {
if (loads == null) {
LOG.warn("unable to get inputs of the job");
return;
}
-
+
if (loads.size() == 1) {
- FileSpec fsp = loads.get(0);
+ FileSpec fsp = loads.get(0);
if (!MRPigStatsUtil.isTempFile(fsp.getFileName())) {
- long records = mapInputRecords;
+ long records = mapInputRecords;
InputStats is = new InputStats(fsp.getFileName(),
- hdfsBytesRead, records, (state == JobState.SUCCESS));
+ hdfsBytesRead, records, (state == JobState.SUCCESS));
is.setConf(conf);
if (isSampler()) is.markSampleInput();
if (isIndexer()) is.markIndexerInput();
- inputs.add(is);
+ inputs.add(is);
}
} else {
for (int i=0; i<loads.size(); i++) {
@@ -507,14 +505,14 @@ public final class MRJobStats extends Jo
if (MRPigStatsUtil.isTempFile(fsp.getFileName())) continue;
addOneInputStats(fsp.getFileName(), i);
}
- }
+ }
}
-
+
private void addOneInputStats(String fileName, int index) {
long records = -1;
Long n = multiInputCounters.get(
MRPigStatsUtil.getMultiInputsCounterName(fileName, index));
- if (n != null) {
+ if (n != null) {
records = n;
} else {
// the file could be empty
@@ -523,9 +521,9 @@ public final class MRJobStats extends Jo
LOG.warn("unable to get input counter for " + fileName);
}
}
- InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
+ InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
is.setConf(conf);
inputs.add(is);
}
-
+
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Mon Nov 11 21:50:50 2013
@@ -27,12 +27,14 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
@@ -48,41 +50,41 @@ import org.apache.pig.tools.pigstats.Job
*/
public class MRPigStatsUtil extends PigStatsUtil {
- public static final String MULTI_STORE_RECORD_COUNTER
+ public static final String MULTI_STORE_RECORD_COUNTER
= "Output records in ";
- public static final String MULTI_STORE_COUNTER_GROUP
+ public static final String MULTI_STORE_COUNTER_GROUP
= "MultiStoreCounters";
- public static final String TASK_COUNTER_GROUP
+ public static final String TASK_COUNTER_GROUP
= "org.apache.hadoop.mapred.Task$Counter";
- public static final String FS_COUNTER_GROUP
+ public static final String FS_COUNTER_GROUP
= HadoopShims.getFsCounterGroupName();
- public static final String MAP_INPUT_RECORDS
+ public static final String MAP_INPUT_RECORDS
= "MAP_INPUT_RECORDS";
- public static final String MAP_OUTPUT_RECORDS
+ public static final String MAP_OUTPUT_RECORDS
= "MAP_OUTPUT_RECORDS";
- public static final String REDUCE_INPUT_RECORDS
+ public static final String REDUCE_INPUT_RECORDS
= "REDUCE_INPUT_RECORDS";
- public static final String REDUCE_OUTPUT_RECORDS
+ public static final String REDUCE_OUTPUT_RECORDS
= "REDUCE_OUTPUT_RECORDS";
- public static final String HDFS_BYTES_WRITTEN
+ public static final String HDFS_BYTES_WRITTEN
= "HDFS_BYTES_WRITTEN";
- public static final String HDFS_BYTES_READ
+ public static final String HDFS_BYTES_READ
= "HDFS_BYTES_READ";
- public static final String MULTI_INPUTS_RECORD_COUNTER
+ public static final String MULTI_INPUTS_RECORD_COUNTER
= "Input records from ";
- public static final String MULTI_INPUTS_COUNTER_GROUP
+ public static final String MULTI_INPUTS_COUNTER_GROUP
= "MultiInputCounters";
-
+
private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
-
+
// Restrict total string size of a counter name to 64 characters.
// Leave 24 characters for prefix string.
private static final int COUNTER_NAME_LIMIT = 40;
-
+
/**
- * Returns the count for the given counter name in the counter group
+ * Returns the count for the given counter name in the counter group
* 'MultiStoreCounters'
- *
+ *
* @param job the MR job
* @param jobClient the Hadoop job client
* @param counterName the counter name
@@ -102,37 +104,37 @@ public class MRPigStatsUtil extends PigS
} catch (IOException e) {
LOG.warn("Failed to get the counter for " + counterName, e);
}
- return value;
+ return value;
}
-
+
/**
* Returns the counter name for the given {@link POStore}
- *
+ *
* @param store the POStore
- * @return the counter name
+ * @return the counter name
*/
public static String getMultiStoreCounterName(POStore store) {
String shortName = getShortName(store.getSFile().getFileName());
- return (shortName == null) ? null
+ return (shortName == null) ? null
: MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
}
-
+
/**
* Returns the counter name for the given input file name
- *
+ *
* @param fname the input file name
* @return the counter name
*/
public static String getMultiInputsCounterName(String fname, int index) {
- String shortName = getShortName(fname);
- return (shortName == null) ? null
+ String shortName = getShortName(fname);
+ return (shortName == null) ? null
: MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
}
-
+
private static final String SEPARATOR = "/";
private static final String SEMICOLON = ";";
-
- private static String getShortName(String uri) {
+
+ private static String getShortName(String uri) {
int scolon = uri.indexOf(SEMICOLON);
int slash;
if (scolon!=-1) {
@@ -153,39 +155,39 @@ public class MRPigStatsUtil extends PigS
}
return shortName;
}
-
+
/**
* Starts collecting statistics for the given MR plan
- *
+ *
* @param pc the Pig context
* @param client the Hadoop job client
* @param jcc the job compiler
* @param plan the MR plan
*/
- public static void startCollection(PigContext pc, JobClient client,
+ public static void startCollection(PigContext pc, JobClient client,
JobControlCompiler jcc, MROperPlan plan) {
SimplePigStats ps = (SimplePigStats)PigStats.start(new SimplePigStats());
- ps.start(pc, client, jcc, plan);
+ ps.initialize(pc, client, jcc, plan);
MRScriptState.get().emitInitialPlanNotification(plan);
MRScriptState.get().emitLaunchStartedNotification(plan.size());
}
-
+
/**
* Stops collecting statistics for a MR plan
- *
- * @param display if true, log collected statistics in the Pig log
- * file at INFO level
+ *
+ * @param display if true, log collected statistics in the Pig log
+ * file at INFO level
*/
public static void stopCollection(boolean display) {
SimplePigStats ps = (SimplePigStats)PigStats.get();
- ps.stop();
+ ps.finish();
if (!ps.isSuccessful()) {
LOG.error(ps.getNumberFailedJobs() + " map reduce job(s) failed!");
String errMsg = ps.getErrorMessage();
if (errMsg != null) {
LOG.error("Error message: " + errMsg);
- }
+ }
}
MRScriptState.get().emitLaunchCompletedNotification(
ps.getNumberSuccessfulJobs());
@@ -209,13 +211,13 @@ public class MRPigStatsUtil extends PigS
public static void displayStatistics() {
((SimplePigStats)PigStats.get()).display();
}
-
+
/**
- * Updates the {@link JobGraph} of the {@link PigStats}. The initial
- * {@link JobGraph} is created without job ids using {@link MROperPlan},
+ * Updates the {@link JobGraph} of the {@link PigStats}. The initial
+ * {@link JobGraph} is created without job ids using {@link MROperPlan},
* before any job is submitted for execution. The {@link JobGraph} then
- * is updated with job ids after jobs are executed.
- *
+ * is updated with job ids after jobs are executed.
+ *
* @param jobMroMap the map that maps {@link Job}s to {@link MapReduceOper}s
*/
public static void updateJobMroMap(Map<Job, MapReduceOper> jobMroMap) {
@@ -223,58 +225,62 @@ public class MRPigStatsUtil extends PigS
for (Map.Entry<Job, MapReduceOper> entry : jobMroMap.entrySet()) {
MapReduceOper mro = entry.getValue();
ps.mapMROperToJob(mro, entry.getKey());
- }
+ }
}
-
+
/**
* Updates the statistics after a patch of jobs is done
- *
+ *
* @param jc the job control
*/
public static void accumulateStats(JobControl jc) {
SimplePigStats ps = (SimplePigStats)PigStats.get();
MRScriptState ss = MRScriptState.get();
-
- for (Job job : jc.getSuccessfulJobs()) {
+
+ for (Job job : jc.getSuccessfulJobs()) {
MRJobStats js = addSuccessJobStats(ps, job);
if (js != null) {
ss.emitjobFinishedNotification(js);
}
}
-
- for (Job job : jc.getFailedJobs()) {
+
+ for (Job job : jc.getFailedJobs()) {
MRJobStats js = addFailedJobStats(ps, job);
if (js != null) {
- js.setErrorMsg(job.getMessage());
+ js.setErrorMsg(job.getMessage());
ss.emitJobFailedNotification(js);
- }
+ }
}
}
-
-
+
+ @Private
public static void setBackendException(Job job, Exception e) {
- ((SimplePigStats)PigStats.get()).setBackendException(job, e);
+ JobID jobId = job.getAssignedJobID();
+ if (jobId == null) {
+ return;
+ }
+ PigStats.get().setBackendException(jobId.toString(), e);
}
-
+
private static MRJobStats addFailedJobStats(SimplePigStats ps, Job job) {
if (ps.isJobSeen(job)) return null;
-
+
MRJobStats js = ps.addMRJobStats(job);
if (js == null) {
- LOG.warn("unable to add failed job stats");
- } else {
+ LOG.warn("unable to add failed job stats");
+ } else {
js.setSuccessful(false);
js.addOutputStatistics();
js.addInputStatistics();
}
return js;
}
-
+
public static MRJobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
boolean success) {
return addNativeJobStats(ps, mr, success, null);
}
-
+
public static MRJobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
boolean success, Exception e) {
if (ps.isEmbedded()) {
@@ -289,19 +295,19 @@ public class MRPigStatsUtil extends PigS
js.setBackendException(e);
}
return js;
- }
-
+ }
+
private static MRJobStats addSuccessJobStats(SimplePigStats ps, Job job) {
if (ps.isJobSeen(job)) return null;
MRJobStats js = ps.addMRJobStats(job);
if (js == null) {
LOG.warn("unable to add job stats");
- } else {
+ } else {
js.setSuccessful(true);
-
+
js.addMapReduceStatistics(ps.getJobClient(), job.getJobConf());
-
+
JobClient client = ps.getJobClient();
RunningJob rjob = null;
try {
@@ -310,14 +316,14 @@ public class MRPigStatsUtil extends PigS
LOG.warn("Failed to get running job", e);
}
if (rjob == null) {
- LOG.warn("Failed to get RunningJob for job "
- + job.getAssignedJobID());
- } else {
- js.addCounters(rjob);
+ LOG.warn("Failed to get RunningJob for job "
+ + job.getAssignedJobID());
+ } else {
+ js.addCounters(rjob);
}
-
+
js.addOutputStatistics();
-
+
js.addInputStatistics();
}
return js;
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Mon Nov 11 21:50:50 2013
@@ -18,25 +18,20 @@
package org.apache.pig.tools.pigstats.mapreduce;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.pig.ExecType;
-import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -48,45 +43,26 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-import org.apache.pig.tools.pigstats.JobStats.JobState;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.JobStats;
/**
- * SimplePigStats encapsulates the statistics collected from a running script.
- * It includes status of the execution, the DAG of its MR jobs, as well as
- * information about outputs and inputs of the script.
+ * SimplePigStats encapsulates the statistics collected from a running script.
+ * It includes status of the execution, the DAG of its MR jobs, as well as
+ * information about outputs and inputs of the script.
*/
public final class SimplePigStats extends PigStats {
-
private static final Log LOG = LogFactory.getLog(SimplePigStats.class);
-
- private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
- private PigContext pigContext;
-
+
private JobClient jobClient;
-
private JobControlCompiler jcc;
-
- private JobGraph jobPlan;
-
private Map<Job, MapReduceOper> jobMroMap;
-
private Map<MapReduceOper, MRJobStats> mroJobMap;
-
+
// successful jobs so far
private Set<Job> jobSeen = new HashSet<Job>();
-
- private Map<String, OutputStats> aliasOuputMap;
-
- private long startTime = -1;
- private long endTime = -1;
-
- private String userId;
-
+
/**
* This class builds the job DAG from a MR plan
*/
@@ -96,13 +72,13 @@ public final class SimplePigStats extend
super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
plan));
jobPlan = new JobGraph();
- mroJobMap = new HashMap<MapReduceOper, MRJobStats>();
+ mroJobMap = new HashMap<MapReduceOper, MRJobStats>();
}
-
+
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
MRJobStats js = new MRJobStats(
- mr.getOperatorKey().toString(), jobPlan);
+ mr.getOperatorKey().toString(), jobPlan);
jobPlan.add(js);
List<MapReduceOper> preds = getPlan().getPredecessors(mr);
if (preds != null) {
@@ -113,10 +89,10 @@ public final class SimplePigStats extend
}
}
}
- mroJobMap.put(mr, js);
- }
+ mroJobMap.put(mr, js);
+ }
}
-
+
@Override
public List<String> getAllErrorMessages() {
throw new UnsupportedOperationException();
@@ -131,83 +107,6 @@ public final class SimplePigStats extend
public boolean isEmbedded() {
return false;
}
-
- @Override
- public boolean isSuccessful() {
- return (getNumberJobs()==0 && returnCode==ReturnCode.UNKNOWN
- || returnCode == ReturnCode.SUCCESS);
- }
-
- @Override
- public Properties getPigProperties() {
- if (pigContext == null) return null;
- return pigContext.getProperties();
- }
-
- @Override
- public JobGraph getJobGraph() {
- return jobPlan;
- }
-
- @Override
- public List<String> getOutputLocations() {
- ArrayList<String> locations = new ArrayList<String>();
- for (OutputStats output : getOutputStats()) {
- locations.add(output.getLocation());
- }
- return Collections.unmodifiableList(locations);
- }
-
- @Override
- public List<String> getOutputNames() {
- ArrayList<String> names = new ArrayList<String>();
- for (OutputStats output : getOutputStats()) {
- names.add(output.getName());
- }
- return Collections.unmodifiableList(names);
- }
-
- @Override
- public long getNumberBytes(String location) {
- if (location == null) return -1;
- String name = new Path(location).getName();
- long count = -1;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- count = output.getBytes();
- break;
- }
- }
- return count;
- }
-
- @Override
- public long getNumberRecords(String location) {
- if (location == null) return -1;
- String name = new Path(location).getName();
- long count = -1;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- count = output.getNumberRecords();
- break;
- }
- }
- return count;
- }
-
- @Override
- public String getOutputAlias(String location) {
- if (location == null) return null;
- String name = new Path(location).getName();
- String alias = null;
- for (OutputStats output : getOutputStats()) {
- if (name.equals(output.getName())) {
- alias = output.getAlias();
- break;
- }
- }
- return alias;
- }
@Override
public long getSMMSpillCount() {
@@ -223,149 +122,52 @@ public final class SimplePigStats extend
public long getProactiveSpillCountObjects() {
Iterator<JobStats> it = jobPlan.iterator();
long ret = 0;
- while (it.hasNext()) {
+ while (it.hasNext()) {
ret += ((MRJobStats) it.next()).getProactiveSpillCountObjects();
}
return ret;
}
-
+
@Override
public long getProactiveSpillCountRecords() {
Iterator<JobStats> it = jobPlan.iterator();
long ret = 0;
- while (it.hasNext()) {
- ret += ((MRJobStats) it.next()).getProactiveSpillCountRecs();
- }
- return ret;
- }
-
- @Override
- public long getBytesWritten() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
- while (it.hasNext()) {
- long n = it.next().getBytesWritten();
- if (n > 0) ret += n;
- }
- return ret;
- }
-
- @Override
- public long getRecordWritten() {
- Iterator<JobStats> it = jobPlan.iterator();
- long ret = 0;
while (it.hasNext()) {
- long n = it.next().getRecordWrittern();
- if (n > 0) ret += n;
+ ret += ((MRJobStats) it.next()).getProactiveSpillCountRecs();
}
return ret;
}
-
- @Override
- public String getScriptId() {
- return ScriptState.get().getId();
- }
-
- @Override
- public String getFeatures() {
- return ScriptState.get().getScriptFeatures();
- }
-
- @Override
- public long getDuration() {
- return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
- }
-
- @Override
- public int getNumberJobs() {
- return jobPlan.size();
- }
-
- @Override
- public List<OutputStats> getOutputStats() {
- List<OutputStats> outputs = new ArrayList<OutputStats>();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- for (OutputStats os : iter.next().getOutputs()) {
- outputs.add(os);
- }
- }
- return Collections.unmodifiableList(outputs);
- }
-
- @Override
- public OutputStats result(String alias) {
- if (aliasOuputMap == null) {
- aliasOuputMap = new HashMap<String, OutputStats>();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- for (OutputStats os : iter.next().getOutputs()) {
- String a = os.getAlias();
- if (a == null || a.length() == 0) {
- LOG.warn("Output alias isn't avalable for " + os.getLocation());
- continue;
- }
- aliasOuputMap.put(a, os);
- }
- }
- }
- return aliasOuputMap.get(alias);
- }
-
- @Override
- public List<InputStats> getInputStats() {
- List<InputStats> inputs = new ArrayList<InputStats>();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- for (InputStats is : iter.next().getInputs()) {
- inputs.add(is);
- }
- }
- return Collections.unmodifiableList(inputs);
- }
-
- public SimplePigStats() {
- jobMroMap = new HashMap<Job, MapReduceOper>();
+
+ public SimplePigStats() {
+ jobMroMap = new HashMap<Job, MapReduceOper>();
jobPlan = new JobGraph();
}
-
- void start(PigContext pigContext, JobClient jobClient,
+
+ void initialize(PigContext pigContext, JobClient jobClient,
JobControlCompiler jcc, MROperPlan mrPlan) {
-
+ super.start();
+
if (pigContext == null || jobClient == null || jcc == null) {
LOG.warn("invalid params: " + pigContext + jobClient + jcc);
return;
}
-
+
this.pigContext = pigContext;
this.jobClient = jobClient;
- this.jcc = jcc;
-
- // build job DAG with job ids assigned to null
+ this.jcc = jcc;
+
+ // build job DAG with job ids assigned to null
try {
new JobGraphBuilder(mrPlan).visit();
} catch (VisitorException e) {
LOG.warn("unable to build job plan", e);
}
-
- startTime = System.currentTimeMillis();
- userId = System.getProperty("user.name");
- }
-
- void stop() {
- endTime = System.currentTimeMillis();
- int m = getNumberSuccessfulJobs();
- int n = getNumberFailedJobs();
-
- if (n == 0 && m > 0 && m == jobPlan.size()) {
- returnCode = ReturnCode.SUCCESS;
- } else if (m > 0 && m < jobPlan.size()) {
- returnCode = ReturnCode.PARTIAL_FAILURE;
- } else {
- returnCode = ReturnCode.FAILURE;
- }
}
-
+
+ void finish() {
+ super.stop();
+ }
+
boolean isInitialized() {
return startTime > 0;
}
@@ -374,37 +176,35 @@ public final class SimplePigStats extend
public JobClient getJobClient() {
return jobClient;
}
-
+
JobControlCompiler getJobControlCompiler() {
return jcc;
}
-
- @SuppressWarnings("deprecation")
+
MRJobStats addMRJobStats(Job job) {
MapReduceOper mro = jobMroMap.get(job);
-
+
if (mro == null) {
LOG.warn("unable to get MR oper for job: " + job.toString());
return null;
}
MRJobStats js = mroJobMap.get(mro);
-
+
JobID jobId = job.getAssignedJobID();
js.setId(jobId);
js.setAlias(mro);
js.setConf(job.getJobConf());
return js;
}
-
- @SuppressWarnings("deprecation")
- public MRJobStats addMRJobStatsForNative(NativeMapReduceOper mr) {
+
+ MRJobStats addMRJobStatsForNative(NativeMapReduceOper mr) {
MRJobStats js = mroJobMap.get(mr);
- js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber()));
+ js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber()));
js.setAlias(mr);
-
+
return js;
}
-
+
void display() {
if (returnCode == ReturnCode.UNKNOWN) {
LOG.warn("unknown return code, can't display the results");
@@ -414,13 +214,13 @@ public final class SimplePigStats extend
LOG.warn("unknown exec type, don't display the results");
return;
}
-
+
// currently counters are not working in local mode - see PIG-1286
ExecType execType = pigContext.getExecType();
if (execType.isLocal()) {
LOG.info("Detected Local mode. Stats reported below may be incomplete");
}
-
+
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
@@ -438,9 +238,9 @@ public final class SimplePigStats extend
sb.append("Failed!\n");
}
sb.append("\n");
-
- if (returnCode == ReturnCode.SUCCESS
- || returnCode == ReturnCode.PARTIAL_FAILURE) {
+
+ if (returnCode == ReturnCode.SUCCESS
+ || returnCode == ReturnCode.PARTIAL_FAILURE) {
sb.append("Job Stats (time in seconds):\n");
if (execType.isLocal()) {
sb.append(MRJobStats.SUCCESS_HEADER_LOCAL).append("\n");
@@ -448,7 +248,7 @@ public final class SimplePigStats extend
sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
}
List<JobStats> arr = jobPlan.getSuccessfulJobs();
- for (JobStats js : arr) {
+ for (JobStats js : arr) {
sb.append(js.getDisplayString(execType.isLocal()));
}
sb.append("\n");
@@ -458,7 +258,7 @@ public final class SimplePigStats extend
sb.append("Failed Jobs:\n");
sb.append(MRJobStats.FAILURE_HEADER).append("\n");
List<JobStats> arr = jobPlan.getFailedJobs();
- for (JobStats js : arr) {
+ for (JobStats js : arr) {
sb.append(js.getDisplayString(execType.isLocal()));
}
sb.append("\n");
@@ -472,24 +272,24 @@ public final class SimplePigStats extend
for (OutputStats ds : getOutputStats()) {
sb.append(ds.getDisplayString(execType.isLocal()));
}
-
+
if (!(execType.isLocal())) {
sb.append("\nCounters:\n");
sb.append("Total records written : " + getRecordWritten()).append("\n");
sb.append("Total bytes written : " + getBytesWritten()).append("\n");
sb.append("Spillable Memory Manager spill count : "
+ getSMMSpillCount()).append("\n");
- sb.append("Total bags proactively spilled: "
+ sb.append("Total bags proactively spilled: "
+ getProactiveSpillCountObjects()).append("\n");
- sb.append("Total records proactively spilled: "
+ sb.append("Total records proactively spilled: "
+ getProactiveSpillCountRecords()).append("\n");
}
-
+
sb.append("\nJob DAG:\n").append(jobPlan.toString());
-
+
LOG.info("Script Statistics: \n" + sb.toString());
}
-
+
void mapMROperToJob(MapReduceOper mro, Job job) {
if (mro == null) {
LOG.warn("null MR operator");
@@ -501,55 +301,10 @@ public final class SimplePigStats extend
jobMroMap.put(job, mro);
}
}
- }
-
- void setBackendException(Job job, Exception e) {
- if (e instanceof PigException) {
- LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": "
- + e.getLocalizedMessage());
- } else if (e != null) {
- LOG.error("ERROR: " + e.getLocalizedMessage());
- }
-
- if (job.getAssignedJobID() == null || e == null) {
- LOG.debug("unable to set backend exception");
- return;
- }
- String id = job.getAssignedJobID().toString();
- Iterator<JobStats> iter = jobPlan.iterator();
- while (iter.hasNext()) {
- JobStats js = iter.next();
- if (id.equals(js.getJobId())) {
- js.setBackendException(e);
- break;
- }
- }
- }
-
- PigContext getPigContext() {
- return pigContext;
- }
-
- int getNumberSuccessfulJobs() {
- Iterator<JobStats> iter = jobPlan.iterator();
- int count = 0;
- while (iter.hasNext()) {
- if (iter.next().getState() == JobState.SUCCESS) count++;
- }
- return count;
- }
-
- int getNumberFailedJobs() {
- Iterator<JobStats> iter = jobPlan.iterator();
- int count = 0;
- while (iter.hasNext()) {
- if (iter.next().getState() == JobState.FAILED) count++;
- }
- return count;
}
-
+
boolean isJobSeen(Job job) {
- return !jobSeen.add(job);
+ return !jobSeen.add(job);
}
-
+
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon Nov 11 21:50:50 2013
@@ -582,16 +582,10 @@ public class TestPigRunner {
String[] args = { "-Dpig.additional.jars=pig-withouthadoop.jar",
"-Dmapred.job.queue.name=default",
"-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
Util.deleteFile(cluster, OUTPUT_FILE);
-
- java.lang.reflect.Method getPigContext = stats.getClass()
- .getDeclaredMethod("getPigContext");
-
- getPigContext.setAccessible(true);
-
- PigContext ctx = (PigContext) getPigContext.invoke(stats);
+ PigContext ctx = stats.getPigContext();
Assert.assertNotNull(ctx);