You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/30 22:04:31 UTC
svn commit: r1519062 [2/5] - in /pig/trunk: ./ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ s...
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Aug 30 20:04:29 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -106,6 +105,7 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
/**
* This is compiler class that takes an MROperPlan and converts
@@ -167,7 +167,7 @@ public class JobControlCompiler{
private Map<Job, MapReduceOper> jobMroMap;
private int counterSize;
- public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+ public JobControlCompiler(PigContext pigContext, Configuration conf) {
this.pigContext = pigContext;
this.conf = conf;
jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
@@ -270,7 +270,7 @@ public class JobControlCompiler{
this.plan = plan;
int timeToSleep;
- String defaultPigJobControlSleep = pigContext.getExecType() == ExecType.LOCAL ? "100" : "5000";
+ String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
String pigJobControlSleep = conf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
@@ -438,7 +438,7 @@ public class JobControlCompiler{
// add settings for pig statistics
String setScriptProp = conf.get(ScriptState.INSERT_ENABLED, "true");
if (setScriptProp.equalsIgnoreCase("true")) {
- ScriptState ss = ScriptState.get();
+ MRScriptState ss = MRScriptState.get();
ss.addSettingsToConf(mro, conf);
}
@@ -501,7 +501,7 @@ public class JobControlCompiler{
}
}
- if (!pigContext.inIllustrator && pigContext.getExecType() != ExecType.LOCAL)
+ if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
{
// Setup the DistributedCache for this job
@@ -838,7 +838,7 @@ public class JobControlCompiler{
}
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
- Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
+ Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList<Job>());
jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
return cjob;
@@ -1426,7 +1426,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added by Hadoop team.
- if (pigContext.getExecType() != ExecType.LOCAL) {
+ if (!pigContext.getExecType().isLocal()) {
symlink = prefix + "_"
+ Integer.toString(System.identityHashCode(filename)) + "_"
+ Long.toString(System.currentTimeMillis());
@@ -1477,6 +1477,7 @@ public class JobControlCompiler{
* @return the path as seen on distributed cache
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
private static void putJarOnClassPathThroughDistributedCache(
PigContext pigContext,
Configuration conf,
@@ -1542,7 +1543,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
+ if (pigContext.getExecType().isLocal()) return;
// set up distributed cache for the replicated files
FileSpec[] replFiles = join.getReplFiles();
@@ -1583,7 +1584,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
+ if (pigContext.getExecType().isLocal()) return;
String indexFile = join.getIndexFile();
@@ -1607,7 +1608,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
+ if (pigContext.getExecType().isLocal()) return;
String indexFile = mergeCoGrp.getIndexFileName();
@@ -1644,7 +1645,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType() == ExecType.LOCAL) return;
+ if (pigContext.getExecType().isLocal()) return;
// set up distributed cache for files indicated by the UDF
String[] files = func.getCacheFiles();
@@ -1688,8 +1689,6 @@ public class JobControlCompiler{
}
}
}
-
- boolean isReplaced() { return replaced; }
}
}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.Properties;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * LocalExecType is the ExecType for local mode in Hadoop Mapreduce.
+ *
+ */
+
+public class LocalExecType implements ExecType {
+
+ private static final String[] modes = { "LOCAL", "MAPREDUCE_LOCAL",
+ "MAPRED_LOCAL" };
+
+ @Override
+ public boolean accepts(Properties properties) {
+ String execTypeSpecified = properties.getProperty("exectype", "")
+ .toUpperCase();
+ for (String mode : modes) {
+ if (execTypeSpecified.equals(mode)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ExecutionEngine getExecutionEngine(PigContext pigContext) {
+ return new MRExecutionEngine(pigContext);
+ }
+
+ @Override
+ public Class getExecutionEngineClass() {
+ return MRExecutionEngine.class;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public String name() {
+ return "LOCAL";
+ }
+
+ public String toString() {
+ return name();
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * MRExecType is the ExecType for distributed mode in Hadoop Mapreduce.
+ *
+ */
+public class MRExecType implements ExecType {
+
+ private static final long serialVersionUID = 1L;
+ private static final String[] modes = { "MAPREDUCE", "MAPRED" };
+
+ @Override
+ public boolean accepts(Properties properties) {
+ String execTypeSpecified = properties.getProperty("exectype", "")
+ .toUpperCase();
+ for (String mode : modes) {
+ if (execTypeSpecified.equals(mode)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ExecutionEngine getExecutionEngine(PigContext pigContext) {
+ return new MRExecutionEngine(pigContext);
+ }
+
+ @Override
+ public Class<? extends ExecutionEngine> getExecutionEngineClass() {
+ return MRExecutionEngine.class;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "MAPREDUCE";
+ }
+
+ public String toString() {
+ return name();
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Aug 30 20:04:29 2013
@@ -43,9 +43,11 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.PigWarning;
+import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
@@ -67,9 +69,11 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
/**
@@ -94,29 +98,39 @@ public class MapReduceLauncher extends L
private JobControl jc=null;
- private class HangingJobKiller extends Thread {
- public HangingJobKiller() {
- }
- @Override
- public void run() {
- try {
- log.debug("Receive kill signal");
- if (jc!=null) {
- for (Job job : jc.getRunningJobs()) {
- RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
- if (runningJob!=null)
- runningJob.killJob();
- log.info("Job " + job.getJobID() + " killed");
- }
+ public void kill() {
+ try {
+ log.debug("Receive kill signal");
+ if (jc!=null) {
+ for (Job job : jc.getRunningJobs()) {
+ RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
+ if (runningJob!=null)
+ runningJob.killJob();
+ log.info("Job " + job.getJobID() + " killed");
}
- } catch (Exception e) {
- log.warn("Encounter exception on cleanup:" + e);
}
+ } catch (Exception e) {
+ log.warn("Encounter exception on cleanup:" + e);
}
}
-
- public MapReduceLauncher() {
- Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
+
+ public void killJob(String jobID, JobConf jobConf) throws BackendException {
+ try {
+ if (jobConf != null) {
+ JobClient jc = new JobClient(jobConf);
+ JobID id = JobID.forName(jobID);
+ RunningJob job = jc.getJob(id);
+ if (job == null)
+ System.out.println("Job with id " + jobID + " is not active");
+ else
+ {
+ job.killJob();
+ log.info("Kill " + id + " submitted.");
+ }
+ }
+ } catch (IOException e) {
+ throw new BackendException(e);
+ }
}
/**
@@ -150,15 +164,15 @@ public class MapReduceLauncher extends L
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- HExecutionEngine exe = pc.getExecutionEngine();
+ MRExecutionEngine exe = (MRExecutionEngine) pc.getExecutionEngine();
JobClient jobClient = new JobClient(exe.getJobConf());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
- ScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
-
+ MRScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
+
// start collecting statistics
- PigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
+ MRPigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
// Find all the intermediate data stores. The plan will be destroyed during compile/execution
// so this needs to be done before.
@@ -193,7 +207,7 @@ public class MapReduceLauncher extends L
if(mro instanceof NativeMapReduceOper) {
NativeMapReduceOper natOp = (NativeMapReduceOper)mro;
try {
- ScriptState.get().emitJobsSubmittedNotification(1);
+ MRScriptState.get().emitJobsSubmittedNotification(1);
natOp.runJob();
numMRJobsCompl++;
} catch (IOException e) {
@@ -232,17 +246,17 @@ public class MapReduceLauncher extends L
List<Job> jobsWithoutIds = jc.getWaitingJobs();
log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
//notify listeners about jobs submitted
- ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
+ MRScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
// update Pig stats' job DAG with just compiled jobs
- PigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
+ MRPigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
// determine job tracker url
String jobTrackerLoc;
JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
try {
String port = jobConf.get("mapred.job.tracker.http.address");
- String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+ String jobTrackerAdd = jobConf.get(MRExecutionEngine.JOB_TRACKER_LOCATION);
jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"))
+ port.substring(port.indexOf(":"));
@@ -305,9 +319,9 @@ public class MapReduceLauncher extends L
// display the aliases being processed
MapReduceOper mro = jcc.getJobMroMap().get(job);
if (mro != null) {
- String alias = ScriptState.get().getAlias(mro);
+ String alias = MRScriptState.get().getAlias(mro);
log.info("Processing aliases " + alias);
- String aliasLocation = ScriptState.get().getAliasLocation(mro);
+ String aliasLocation = MRScriptState.get().getAliasLocation(mro);
log.info("detailed locations: " + aliasLocation);
}
@@ -318,8 +332,8 @@ public class MapReduceLauncher extends L
}
// update statistics for this job so jobId is set
- PigStatsUtil.addJobStats(job);
- ScriptState.get().emitJobStartedNotification(
+ MRPigStatsUtil.addJobStats(job);
+ MRScriptState.get().emitJobStartedNotification(
job.getAssignedJobID().toString());
}
else{
@@ -334,7 +348,7 @@ public class MapReduceLauncher extends L
}
// collect job stats by frequently polling of completed jobs (PIG-1829)
- PigStatsUtil.accumulateStats(jc);
+ MRPigStatsUtil.accumulateStats(jc);
// if stop_on_failure is enabled, we need to stop immediately when any job has failed
checkStopOnFailure(stop_on_failure);
@@ -389,7 +403,7 @@ public class MapReduceLauncher extends L
succJobs.addAll(jobs);
// collecting final statistics
- PigStatsUtil.accumulateStats(jc);
+ MRPigStatsUtil.accumulateStats(jc);
}
catch (Exception e) {
@@ -400,7 +414,7 @@ public class MapReduceLauncher extends L
}
}
- ScriptState.get().emitProgressUpdatedNotification(100);
+ MRScriptState.get().emitProgressUpdatedNotification(100);
log.info( "100% complete");
@@ -429,13 +443,13 @@ public class MapReduceLauncher extends L
for (POStore st: sts) {
failureMap.put(st.getSFile(), backendException);
}
- PigStatsUtil.setBackendException(fj, backendException);
+ MRPigStatsUtil.setBackendException(fj, backendException);
}
failed = true;
}
// stats collection is done, log the results
- PigStatsUtil.stopCollection(true);
+ MRPigStatsUtil.stopCollection(true);
// PigStatsUtil.stopCollection also computes the return code based on
// total jobs to run, jobs successful and jobs failed
@@ -488,7 +502,30 @@ public class MapReduceLauncher extends L
? ReturnCode.PARTIAL_FAILURE
: ReturnCode.FAILURE)
: ReturnCode.SUCCESS;
- return PigStatsUtil.getPigStats(ret);
+
+ PigStats pigStats = PigStatsUtil.getPigStats(ret);
+ // run cleanup for all of the stores
+ for (OutputStats output : pigStats.getOutputStats()) {
+ POStore store = output.getPOStore();
+ try {
+ if (!output.isSuccessful()) {
+ store.getStoreFunc().cleanupOnFailure(
+ store.getSFile().getFileName(),
+ new org.apache.hadoop.mapreduce.Job(output.getConf()));
+ } else {
+ store.getStoreFunc().cleanupOnSuccess(
+ store.getSFile().getFileName(),
+ new org.apache.hadoop.mapreduce.Job(output.getConf()));
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ } catch (AbstractMethodError nsme) {
+ // Just swallow it. This means we're running against an
+ // older instance of a StoreFunc that doesn't implement
+ // this method.
+ }
+ }
+ return pigStats;
}
/**
@@ -534,7 +571,7 @@ public class MapReduceLauncher extends L
int perCom = (int)(prog * 100);
if(perCom!=100) {
log.info( perCom + "% complete");
- ScriptState.get().emitProgressUpdatedNotification(perCom);
+ MRScriptState.get().emitProgressUpdatedNotification(perCom);
}
return true;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri Aug 30 20:04:29 2013
@@ -29,8 +29,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* This class is used to have a POStore write to DFS via a output
@@ -117,8 +116,8 @@ public class MapReducePOStoreImpl extend
}
public Counter createRecordCounter(POStore store) {
- String name = PigStatsUtil.getMultiStoreCounterName(store);
+ String name = MRPigStatsUtil.getMultiStoreCounterName(store);
return (name == null) ? null : reporter.getCounter(
- PigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java Fri Aug 30 20:04:29 2013
@@ -24,7 +24,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
public class NativeMapReduceOper extends MapReduceOper {
@@ -77,20 +77,20 @@ public class NativeMapReduceOper extends
RunJarSecurityManager secMan = new RunJarSecurityManager();
try {
RunJar.main(getNativeMRParams());
- PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
+ MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
} catch (SecurityException se) {
if(secMan.getExitInvoked()) {
if(secMan.getExitCode() != 0) {
throw new JobCreationException("Native job returned with non-zero return code");
}
else {
- PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
+ MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
}
}
} catch (Throwable t) {
JobCreationException e = new JobCreationException(
"Cannot run native mapreduce job "+ t.getMessage(), t);
- PigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
+ MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
throw e;
} finally {
secMan.retire();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Aug 30 20:04:29 2013
@@ -240,7 +240,7 @@ public class PigInputFormat extends Inpu
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(pigContext.getExecType() == ExecType.MAPREDUCE) {
+ if(!pigContext.getExecType().isLocal()) {
fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Aug 30 20:04:29 2013
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -146,7 +146,7 @@ public class PigRecordReader extends Rec
PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null) {
inputRecordCounter = reporter.getCounter(
- PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
counterName);
LOG.info("Created input record counter: " + counterName);
} else {
@@ -228,7 +228,7 @@ public class PigRecordReader extends Rec
(ArrayList<FileSpec>) ObjectSerializer.deserialize(
conf.get(PigInputFormat.PIG_INPUTS));
String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
- return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
+ return MRPigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Aug 30 20:04:29 2013
@@ -111,7 +111,7 @@ public class WeightedRangePartitioner ex
try{
// use local file system to get the quantilesFile
Configuration conf;
- if (pigContext.getExecType()==ExecType.MAPREDUCE) {
+ if (!pigContext.getExecType().isLocal()) {
conf = new Configuration(true);
} else {
conf = new Configuration(false);
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Fri Aug 30 20:04:29 2013
@@ -265,7 +265,7 @@ public class SchemaTupleBackend {
private static SchemaTupleBackend stb;
public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
- initialize(jConf, pigContext, pigContext.getExecType() == ExecType.LOCAL);
+ initialize(jConf, pigContext, pigContext.getExecType().isLocal());
}
public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java Fri Aug 30 20:04:29 2013
@@ -110,8 +110,8 @@ public class SchemaTupleFrontend {
* @param conf
*/
private void internalCopyAllGeneratedToDistributedCache() {
- LOG.info("Starting process to move generated code to distributed cache");
- if (pigContext.getExecType() == ExecType.LOCAL) {
+ LOG.info("Starting process to move generated code to distributed cacche");
+ if (pigContext.getExecType().isLocal()) {
String codePath = codeDir.getAbsolutePath();
LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
+ LOCAL_CODE_DIR + "] with code temp directory: " + codePath);
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Aug 30 20:04:29 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
import org.apache.pig.FuncSpec;
import org.apache.pig.Main;
import org.apache.pig.PigException;
@@ -58,10 +59,9 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.JarManager;
@@ -86,8 +86,8 @@ public class PigContext implements Seria
* and also because some is not serializable e.g. the Configuration)
*/
- //one of: local, mapreduce, pigbody
- private ExecType execType;;
+ //one of: local, mapreduce, or a custom exec type for a different execution engine
+ private ExecType execType;
//main file system that jobs and shell commands access
transient private DataStorage dfs;
@@ -96,7 +96,7 @@ public class PigContext implements Seria
transient private DataStorage lfs;
// handle to the back-end
- transient private HExecutionEngine executionEngine;
+ transient private ExecutionEngine executionEngine;
private Properties properties;
@@ -232,6 +232,14 @@ public class PigContext implements Seria
public PigContext() {
this(ExecType.MAPREDUCE, new Properties());
}
+
+ public PigContext(Configuration conf) throws PigException {
+ this(ConfigurationUtil.toProperties(conf));
+ }
+
+ public PigContext(Properties properties) throws PigException {
+ this(ExecTypeProvider.selectExecType(properties), properties);
+ }
public PigContext(ExecType execType, Configuration conf) {
this(execType, ConfigurationUtil.toProperties(conf));
@@ -250,7 +258,7 @@ public class PigContext implements Seria
skipJars.add(hadoopJar);
}
- executionEngine = null;
+ this.executionEngine = execType.getExecutionEngine(this);
// Add the default paths to be skipped for auto-shipping of commands
skippedShipPaths.add("/bin");
@@ -291,42 +299,28 @@ public class PigContext implements Seria
}
public void connect() throws ExecException {
-
- switch (execType) {
- case LOCAL:
- case MAPREDUCE:
- {
- executionEngine = new HExecutionEngine (this);
-
executionEngine.init();
-
dfs = executionEngine.getDataStorage();
+ lfs = new HDataStorage(URI.create("file:///"), properties);
+ Runtime.getRuntime().addShutdownHook(new ExecutionEngineKiller());
- lfs = new HDataStorage(URI.create("file:///"),
- properties);
- }
- break;
+ }
- default:
- {
- int errCode = 2040;
- String msg = "Unkown exec type: " + execType;
- throw new ExecException(msg, errCode, PigException.BUG);
+ class ExecutionEngineKiller extends Thread {
+ public ExecutionEngineKiller() {}
+
+ @Override
+ public void run() {
+ try {
+ executionEngine.kill();
+ } catch (Exception e) {
+ log.warn("Error in killing Execution Engine: " + e);
}
}
-
}
public void setJobtrackerLocation(String newLocation) {
- Properties trackerLocation = new Properties();
- trackerLocation.setProperty("mapred.job.tracker", newLocation);
-
- try {
- executionEngine.updateConfiguration(trackerLocation);
- }
- catch (ExecException e) {
- log.error("Failed to set tracker at: " + newLocation);
- }
+ executionEngine.setProperty("mapred.job.tracker", newLocation);
}
/**
@@ -519,7 +513,7 @@ public class PigContext implements Seria
srcElement.copy(dstElement, this.properties, false);
}
- public HExecutionEngine getExecutionEngine() {
+ public ExecutionEngine getExecutionEngine() {
return executionEngine;
}
@@ -798,24 +792,10 @@ public class PigContext implements Seria
* @throws ExecException
*/
public ExecutableManager createExecutableManager() throws ExecException {
- ExecutableManager executableManager = null;
-
- switch (execType) {
- case LOCAL:
- case MAPREDUCE:
- {
- executableManager = new HadoopExecutableManager();
- }
- break;
- default:
- {
- int errCode = 2040;
- String msg = "Unkown exec type: " + execType;
- throw new ExecException(msg, errCode, PigException.BUG);
- }
+ if (executionEngine != null) {
+ return executionEngine.getExecutableManager();
}
-
- return executableManager;
+ return null;
}
public FuncSpec getFuncSpecFromAlias(String alias) {
Modified: pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Fri Aug 30 20:04:29 2013
@@ -30,10 +30,11 @@ import org.apache.pig.impl.util.Identity
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -66,7 +67,7 @@ public class ExampleGenerator {
PhysicalPlan physPlan;
PhysicalPlanResetter physPlanReseter;
- private HExecutionEngine execEngine;
+ private MRExecutionEngine execEngine;
private LocalMapReduceSimulator localMRRunner;
Log log = LogFactory.getLog(getClass());
@@ -98,7 +99,7 @@ public class ExampleGenerator {
+ e.getLocalizedMessage());
}
- execEngine = new HExecutionEngine(pigContext);
+ execEngine = new MRExecutionEngine(pigContext);
localMRRunner = new LocalMapReduceSimulator();
poLoadToSchemaMap = new HashMap<POLoad, LogicalSchema>();
}
Modified: pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java (original)
+++ pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java Fri Aug 30 20:04:29 2013
@@ -29,8 +29,6 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.PigContext;
@@ -151,10 +149,7 @@ public class ToolsPigServer extends PigS
*/
public List<ExecJob> runPlan(LogicalPlan newPlan,
String jobName) throws FrontendException, ExecException {
-
- HExecutionEngine engine = new HExecutionEngine(pigContext);
- PhysicalPlan pp = engine.compile(newPlan, null);
- PigStats stats = launchPlan(pp, jobName);
+ PigStats stats = launchPlan(newPlan, jobName);
return getJobs(stats);
}
Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Aug 30 20:04:29 2013
@@ -34,7 +34,6 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -47,12 +46,7 @@ import jline.ConsoleReaderInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -62,7 +56,6 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -228,15 +221,6 @@ public class GruntParser extends PigScri
mLfs = mPigServer.getPigContext().getLfs();
mConf = mPigServer.getPigContext().getProperties();
shell = new FsShell(ConfigurationUtil.toConfiguration(mConf));
-
- // TODO: this violates the abstraction layer decoupling between
- // front end and back end and needs to be changed.
- // Right now I am not clear on how the Job Id comes from to tell
- // the back end to kill a given job (mJobClient is used only in
- // processKill)
- //
- HExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
- mJobConf = execEngine.getJobConf();
}
public void setScriptIllustrate() {
@@ -398,7 +382,6 @@ public class GruntParser extends PigScri
protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException {
PrintStream lp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
- PrintStream pp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
PrintStream ep = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
if (!(mExplain.mLast && mExplain.mCount == 0)) {
@@ -415,26 +398,24 @@ public class GruntParser extends PigScri
if (file.isDirectory()) {
String sCount = (mExplain.mLast && mExplain.mCount == 1)?"":"_"+mExplain.mCount;
- lp = new PrintStream(new File(file, "logical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
- pp = new PrintStream(new File(file, "physical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
- ep = new PrintStream(new File(file, "exec_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
+ String suffix = mExplain.mTime+sCount+"."+mExplain.mFormat;
+ lp = new PrintStream(new File(file, "logical_plan-"+suffix));
mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
- mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ mExplain.mVerbose, markAsExecuted, lp, null, file, suffix);
lp.close();
- pp.close();
ep.close();
}
else {
boolean append = !(mExplain.mCount==1);
- lp = pp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
+ lp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
- mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
lp.close();
}
}
else {
mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
- mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
}
}
@@ -603,33 +584,7 @@ public class GruntParser extends PigScri
}
else
{
- //mPigServer.getPigContext().getProperties().setProperty(key, value);
- // PIG-2508 properties need to be managed through JobConf
- // since all other code depends on access to properties,
- // we need to re-populate from updated JobConf
- //java.util.HashSet<?> keysBefore = new java.util.HashSet<Object>(mPigServer.getPigContext().getProperties().keySet());
- // set current properties on jobConf
- Properties properties = mPigServer.getPigContext().getProperties();
- Configuration jobConf = mPigServer.getPigContext().getExecutionEngine().getJobConf();
- Enumeration<Object> propertiesIter = properties.keys();
- while (propertiesIter.hasMoreElements()) {
- String pkey = (String) propertiesIter.nextElement();
- String val = properties.getProperty(pkey);
- // We do not put user.name, See PIG-1419
- if (!pkey.equals("user.name"))
- jobConf.set(pkey, val);
- }
- // set new value, JobConf will handle deprecation etc.
- jobConf.set(key, value);
- // re-initialize to reflect updated JobConf
- properties.clear();
- Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
- while (iter.hasNext()) {
- Map.Entry<String, String> entry = iter.next();
- properties.put(entry.getKey(), entry.getValue());
- }
- //keysBefore.removeAll(mPigServer.getPigContext().getProperties().keySet());
- //log.info("PIG-2508: keys dropped from properties: " + keysBefore);
+ mPigServer.getPigContext().getExecutionEngine().setProperty(key, value);
}
}
@@ -812,18 +767,7 @@ public class GruntParser extends PigScri
@Override
protected void processKill(String jobid) throws IOException
{
- if (mJobConf != null) {
- JobClient jc = new JobClient(mJobConf);
- JobID id = JobID.forName(jobid);
- RunningJob job = jc.getJob(id);
- if (job == null)
- System.out.println("Job with id " + jobid + " is not active");
- else
- {
- job.killJob();
- log.info("Kill " + id + " submitted.");
- }
- }
+ mPigServer.getPigContext().getExecutionEngine().killJob(jobid);
}
@Override
@@ -1292,7 +1236,6 @@ public class GruntParser extends PigScri
private DataStorage mDfs;
private DataStorage mLfs;
private Properties mConf;
- private JobConf mJobConf;
private boolean mDone;
private boolean mLoadOnly;
private ExplainState mExplain;
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Fri Aug 30 20:04:29 2013
@@ -43,7 +43,7 @@ public final class InputStats {
private Configuration conf;
- InputStats(String location, long bytes, long records, boolean success) {
+ public InputStats(String location, long bytes, long records, boolean success) {
this.location = location;
this.bytes = bytes;
this.records = records;
@@ -84,7 +84,7 @@ public final class InputStats {
return type;
}
- String getDisplayString(boolean local) {
+ public String getDisplayString(boolean local) {
StringBuilder sb = new StringBuilder();
if (success) {
sb.append("Successfully ");
@@ -116,19 +116,19 @@ public final class InputStats {
return sb.toString();
}
- void setConf(Configuration conf) {
+ public void setConf(Configuration conf) {
this.conf = conf;
}
- void markSampleInput() {
+ public void markSampleInput() {
type = INPUT_TYPE.sampler;
}
- void markIndexerInput() {
+ public void markIndexerInput() {
type = INPUT_TYPE.indexer;
}
- void markSideFileInput() {
+ public void markSideFileInput() {
type = INPUT_TYPE.side;
}
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Fri Aug 30 20:04:29 2013
@@ -18,41 +18,18 @@
package org.apache.pig.tools.pigstats;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
/**
* This class encapsulates the runtime statistics of a MapReduce job.
@@ -60,131 +37,47 @@ import org.apache.pig.tools.pigstats.Sim
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public final class JobStats extends Operator {
+public abstract class JobStats extends Operator {
public static final String ALIAS = "JobStatistics:alias";
public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
public static final String FEATURE = "JobStatistics:feature";
-
- public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
- "MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
- "MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
-
- public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
-
- // currently counters are not working in local mode - see PIG-1286
- public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs";
-
+
private static final Log LOG = LogFactory.getLog(JobStats.class);
+ public static final String SUCCESS_HEADER = null;
+ public static final String FAILURE_HEADER = null;
public static enum JobState { UNKNOWN, SUCCESS, FAILED; }
- private JobState state = JobState.UNKNOWN;
+ protected JobState state = JobState.UNKNOWN;
- private Configuration conf;
-
- private List<POStore> mapStores = null;
+ protected ArrayList<OutputStats> outputs;
- private List<POStore> reduceStores = null;
-
- private List<FileSpec> loads = null;
-
- private ArrayList<OutputStats> outputs;
-
- private ArrayList<InputStats> inputs;
+ protected ArrayList<InputStats> inputs;
private String errorMsg;
private Exception exception = null;
-
- private Boolean disableCounter = false;
-
- @SuppressWarnings("deprecation")
- private JobID jobId;
-
- private long maxMapTime = 0;
- private long minMapTime = 0;
- private long avgMapTime = 0;
- private long medianMapTime = 0;
- private long maxReduceTime = 0;
- private long minReduceTime = 0;
- private long avgReduceTime = 0;
- private long medianReduceTime = 0;
-
- private int numberMaps = 0;
- private int numberReduces = 0;
-
- private long mapInputRecords = 0;
- private long mapOutputRecords = 0;
- private long reduceInputRecords = 0;
- private long reduceOutputRecords = 0;
- private long hdfsBytesWritten = 0;
- private long hdfsBytesRead = 0;
- private long spillCount = 0;
- private long activeSpillCountObj = 0;
- private long activeSpillCountRecs = 0;
-
- private HashMap<String, Long> multiStoreCounters
- = new HashMap<String, Long>();
-
- private HashMap<String, Long> multiInputCounters
- = new HashMap<String, Long>();
-
- @SuppressWarnings("deprecation")
- private Counters counters = null;
-
- JobStats(String name, JobGraph plan) {
+
+ protected JobStats(String name, JobGraph plan) {
super(name, plan);
outputs = new ArrayList<OutputStats>();
inputs = new ArrayList<InputStats>();
}
- public String getJobId() {
- return (jobId == null) ? null : jobId.toString();
- }
+ public abstract String getJobId();
public JobState getState() { return state; }
public boolean isSuccessful() { return (state == JobState.SUCCESS); }
-
- public String getErrorMessage() { return errorMsg; }
-
- public Exception getException() { return exception; }
-
- public int getNumberMaps() { return numberMaps; }
-
- public int getNumberReduces() { return numberReduces; }
-
- public long getMaxMapTime() { return maxMapTime; }
-
- public long getMinMapTime() { return minMapTime; }
-
- public long getAvgMapTime() { return avgMapTime; }
-
- public long getMaxReduceTime() { return maxReduceTime; }
-
- public long getMinReduceTime() { return minReduceTime; }
-
- public long getAvgREduceTime() { return avgReduceTime; }
-
- public long getMapInputRecords() { return mapInputRecords; }
-
- public long getMapOutputRecords() { return mapOutputRecords; }
-
- public long getReduceOutputRecords() { return reduceOutputRecords; }
- public long getReduceInputRecords() { return reduceInputRecords; }
+ public void setSuccessful(boolean isSuccessful) {
+ this.state = isSuccessful ? JobState.SUCCESS : JobState.FAILED;
+ }
- public long getSMMSpillCount() { return spillCount; }
-
- public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
-
- public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
-
- public long getHdfsBytesWritten() { return hdfsBytesWritten; }
+ public String getErrorMessage() { return errorMsg; }
- @SuppressWarnings("deprecation")
- public Counters getHadoopCounters() { return counters; }
+ public Exception getException() { return exception; }
public List<OutputStats> getOutputs() {
return Collections.unmodifiableList(outputs);
@@ -193,10 +86,6 @@ public final class JobStats extends Oper
public List<InputStats> getInputs() {
return Collections.unmodifiableList(inputs);
}
-
- public Map<String, Long> getMultiStoreCounters() {
- return Collections.unmodifiableMap(multiStoreCounters);
- }
public String getAlias() {
return (String)getAnnotation(ALIAS);
@@ -237,237 +126,32 @@ public final class JobStats extends Oper
}
@Override
- public void accept(PlanVisitor v) throws FrontendException {
- if (v instanceof JobGraphPrinter) {
- JobGraphPrinter jpp = (JobGraphPrinter)v;
- jpp.visit(this);
- }
- }
-
+ public abstract void accept(PlanVisitor v) throws FrontendException;
+
+
@Override
public boolean isEqual(Operator operator) {
if (!(operator instanceof JobStats)) return false;
return name.equalsIgnoreCase(operator.getName());
}
-
- @SuppressWarnings("deprecation")
- void setId(JobID jobId) {
- this.jobId = jobId;
- }
-
- void setSuccessful(boolean isSuccessful) {
- this.state = isSuccessful ? JobState.SUCCESS : JobState.FAILED;
- }
-
- void setErrorMsg(String errorMsg) {
+ public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
- void setBackendException(Exception e) {
+ public void setBackendException(Exception e) {
exception = e;
}
-
- @SuppressWarnings("unchecked")
- void setConf(Configuration conf) {
- if (conf == null) return;
- this.conf = conf;
- try {
- this.mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
- .get(JobControlCompiler.PIG_MAP_STORES));
- this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
- .get(JobControlCompiler.PIG_REDUCE_STORES));
- this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
- .get("pig.inputs"));
- this.disableCounter = conf.getBoolean("pig.disable.counter", false);
- } catch (IOException e) {
- LOG.warn("Failed to deserialize the store list", e);
- }
- }
-
- void setMapStat(int size, long max, long min, long avg, long median) {
- numberMaps = size;
- maxMapTime = max;
- minMapTime = min;
- avgMapTime = avg;
- medianMapTime = median;
- }
-
- void setReduceStat(int size, long max, long min, long avg, long median) {
- numberReduces = size;
- maxReduceTime = max;
- minReduceTime = min;
- avgReduceTime = avg;
- medianReduceTime = median;
- }
-
- String getDisplayString(boolean local) {
- StringBuilder sb = new StringBuilder();
- String id = (jobId == null) ? "N/A" : jobId.toString();
- if (state == JobState.FAILED || local) {
- sb.append(id).append("\t")
- .append(getAlias()).append("\t")
- .append(getFeature()).append("\t");
- if (state == JobState.FAILED) {
- sb.append("Message: ").append(getErrorMessage()).append("\t");
- }
- } else if (state == JobState.SUCCESS) {
- sb.append(id).append("\t")
- .append(numberMaps).append("\t")
- .append(numberReduces).append("\t");
- if (numberMaps == 0) {
- sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
- } else {
- sb.append(maxMapTime/1000).append("\t")
- .append(minMapTime/1000).append("\t")
- .append(avgMapTime/1000).append("\t")
- .append(medianMapTime/1000).append("\t");
- }
- if (numberReduces == 0) {
- sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
- } else {
- sb.append(maxReduceTime/1000).append("\t")
- .append(minReduceTime/1000).append("\t")
- .append(avgReduceTime/1000).append("\t")
- .append(medianReduceTime/1000).append("\t");
- }
- sb.append(getAlias()).append("\t")
- .append(getFeature()).append("\t");
- }
- for (OutputStats os : outputs) {
- sb.append(os.getLocation()).append(",");
- }
- sb.append("\n");
- return sb.toString();
- }
-
- @SuppressWarnings("deprecation")
- void addCounters(RunningJob rjob) {
- if (rjob != null) {
- try {
- counters = rjob.getCounters();
- } catch (IOException e) {
- LOG.warn("Unable to get job counters", e);
- }
- }
- if (counters != null) {
- Counters.Group taskgroup = counters
- .getGroup(PigStatsUtil.TASK_COUNTER_GROUP);
- Counters.Group hdfsgroup = counters
- .getGroup(PigStatsUtil.FS_COUNTER_GROUP);
- Counters.Group multistoregroup = counters
- .getGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- Counters.Group multiloadgroup = counters
- .getGroup(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
-
- mapInputRecords = taskgroup.getCounterForName(
- PigStatsUtil.MAP_INPUT_RECORDS).getCounter();
- mapOutputRecords = taskgroup.getCounterForName(
- PigStatsUtil.MAP_OUTPUT_RECORDS).getCounter();
- reduceInputRecords = taskgroup.getCounterForName(
- PigStatsUtil.REDUCE_INPUT_RECORDS).getCounter();
- reduceOutputRecords = taskgroup.getCounterForName(
- PigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
- hdfsBytesRead = hdfsgroup.getCounterForName(
- PigStatsUtil.HDFS_BYTES_READ).getCounter();
- hdfsBytesWritten = hdfsgroup.getCounterForName(
- PigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();
- spillCount = counters.findCounter(
- PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
- .getCounter();
- activeSpillCountObj = counters.findCounter(
- PigCounters.PROACTIVE_SPILL_COUNT_BAGS).getCounter();
- activeSpillCountRecs = counters.findCounter(
- PigCounters.PROACTIVE_SPILL_COUNT_RECS).getCounter();
-
- Iterator<Counter> iter = multistoregroup.iterator();
- while (iter.hasNext()) {
- Counter cter = iter.next();
- multiStoreCounters.put(cter.getName(), cter.getValue());
- }
-
- Iterator<Counter> iter2 = multiloadgroup.iterator();
- while (iter2.hasNext()) {
- Counter cter = iter2.next();
- multiInputCounters.put(cter.getName(), cter.getValue());
- }
-
- }
- }
-
- void addMapReduceStatistics(JobClient client, Configuration conf) {
- TaskReport[] maps = null;
- try {
- maps = client.getMapTaskReports(jobId);
- } catch (IOException e) {
- LOG.warn("Failed to get map task report", e);
- }
- if (maps != null && maps.length > 0) {
- int size = maps.length;
- long max = 0;
- long min = Long.MAX_VALUE;
- long median = 0;
- long total = 0;
- long durations[] = new long[size];
-
- for (int i = 0; i < maps.length; i++) {
- TaskReport rpt = maps[i];
- long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
- max = (duration > max) ? duration : max;
- min = (duration < min) ? duration : min;
- total += duration;
- }
- long avg = total / size;
-
- median = calculateMedianValue(durations);
- setMapStat(size, max, min, avg, median);
- } else {
- int m = conf.getInt("mapred.map.tasks", 1);
- if (m > 0) {
- setMapStat(m, -1, -1, -1, -1);
- }
- }
-
- TaskReport[] reduces = null;
- try {
- reduces = client.getReduceTaskReports(jobId);
- } catch (IOException e) {
- LOG.warn("Failed to get reduce task report", e);
- }
- if (reduces != null && reduces.length > 0) {
- int size = reduces.length;
- long max = 0;
- long min = Long.MAX_VALUE;
- long median = 0;
- long total = 0;
- long durations[] = new long[size];
-
- for (int i = 0; i < reduces.length; i++) {
- TaskReport rpt = reduces[i];
- long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
- max = (duration > max) ? duration : max;
- min = (duration < min) ? duration : min;
- total += duration;
- }
- long avg = total / size;
- median = calculateMedianValue(durations);
- setReduceStat(size, max, min, avg, median);
- } else {
- int m = conf.getInt("mapred.reduce.tasks", 1);
- if (m > 0) {
- setReduceStat(m, -1, -1, -1, -1);
- }
- }
- }
+
+ public abstract String getDisplayString(boolean isLocal);
+
/**
* Calculate the median value from the given array
* @param durations
* @return median value
*/
- private long calculateMedianValue(long[] durations) {
+ protected long calculateMedianValue(long[] durations) {
long median;
// figure out the median
Arrays.sort(durations);
@@ -482,150 +166,11 @@ public final class JobStats extends Oper
return median;
}
- void setAlias(MapReduceOper mro) {
- annotate(ALIAS, ScriptState.get().getAlias(mro));
- annotate(ALIAS_LOCATION, ScriptState.get().getAliasLocation(mro));
- annotate(FEATURE, ScriptState.get().getPigFeature(mro));
- }
-
- void addOutputStatistics() {
- if (mapStores == null || reduceStores == null) {
- LOG.warn("unable to get stores of the job");
- return;
- }
-
- if (mapStores.size() + reduceStores.size() == 1) {
- POStore sto = (mapStores.size() > 0) ? mapStores.get(0)
- : reduceStores.get(0);
- if (!sto.isTmpStore()) {
- long records = (mapStores.size() > 0) ? mapOutputRecords
- : reduceOutputRecords;
- OutputStats ds = new OutputStats(sto.getSFile().getFileName(),
- hdfsBytesWritten, records, (state == JobState.SUCCESS));
- ds.setPOStore(sto);
- ds.setConf(conf);
- outputs.add(ds);
-
- if (state == JobState.SUCCESS) {
- ScriptState.get().emitOutputCompletedNotification(ds);
- }
- }
- } else {
- for (POStore sto : mapStores) {
- if (sto.isTmpStore()) continue;
- addOneOutputStats(sto);
- }
- for (POStore sto : reduceStores) {
- if (sto.isTmpStore()) continue;
- addOneOutputStats(sto);
- }
- }
- }
-
- /**
- * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
- * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
- * defaults to FileBasedOutputSizeReader.
- * @param sto POStore
- * @param conf configuration
- */
- static long getOutputSize(POStore sto, Configuration conf) {
- PigStatsOutputSizeReader reader = null;
- String readerNames = conf.get(
- PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
- FileBasedOutputSizeReader.class.getCanonicalName());
-
- for (String className : readerNames.split(",")) {
- reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
- if (reader.supports(sto)) {
- LOG.info("using output size reader: " + className);
- try {
- return reader.getOutputSize(sto, conf);
- } catch (FileNotFoundException e) {
- LOG.warn("unable to find the output file", e);
- return -1;
- } catch (IOException e) {
- LOG.warn("unable to get byte written of the job", e);
- return -1;
- }
- }
- }
-
- LOG.warn("unable to find an output size reader");
- return -1;
- }
-
- private void addOneOutputStats(POStore sto) {
- long records = -1;
- if (sto.isMultiStore()) {
- Long n = multiStoreCounters.get(PigStatsUtil.getMultiStoreCounterName(sto));
- if (n != null) records = n;
- } else {
- records = mapOutputRecords;
- }
-
- long bytes = getOutputSize(sto, conf);
- String location = sto.getSFile().getFileName();
- OutputStats ds = new OutputStats(location, bytes, records,
- (state == JobState.SUCCESS));
- ds.setPOStore(sto);
- ds.setConf(conf);
- outputs.add(ds);
-
- if (state == JobState.SUCCESS) {
- ScriptState.get().emitOutputCompletedNotification(ds);
- }
- }
-
- void addInputStatistics() {
- if (loads == null) {
- LOG.warn("unable to get inputs of the job");
- return;
- }
-
- if (loads.size() == 1) {
- FileSpec fsp = loads.get(0);
- if (!PigStatsUtil.isTempFile(fsp.getFileName())) {
- long records = mapInputRecords;
- InputStats is = new InputStats(fsp.getFileName(),
- hdfsBytesRead, records, (state == JobState.SUCCESS));
- is.setConf(conf);
- if (isSampler()) is.markSampleInput();
- if (isIndexer()) is.markIndexerInput();
- inputs.add(is);
- }
- } else {
- for (int i=0; i<loads.size(); i++) {
- FileSpec fsp = loads.get(i);
- if (PigStatsUtil.isTempFile(fsp.getFileName())) continue;
- addOneInputStats(fsp.getFileName(), i);
- }
- }
- }
-
- private void addOneInputStats(String fileName, int index) {
- long records = -1;
- Long n = multiInputCounters.get(
- PigStatsUtil.getMultiInputsCounterName(fileName, index));
- if (n != null) {
- records = n;
- } else {
- // the file could be empty
- if (!disableCounter) records = 0;
- else {
- LOG.warn("unable to get input counter for " + fileName);
- }
- }
- InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
- is.setConf(conf);
- inputs.add(is);
- }
-
- private boolean isSampler() {
+ public boolean isSampler() {
return getFeature().contains(ScriptState.PIG_FEATURE.SAMPLER.name());
}
- private boolean isIndexer() {
+ public boolean isIndexer() {
return getFeature().contains(ScriptState.PIG_FEATURE.INDEXER.name());
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Fri Aug 30 20:04:29 2013
@@ -57,7 +57,7 @@ public final class OutputStats {
private static final Log LOG = LogFactory.getLog(OutputStats.class);
- OutputStats(String location, long bytes, long records, boolean success) {
+ public OutputStats(String location, long bytes, long records, boolean success) {
this.location = location;
this.bytes = bytes;
this.records = records;
@@ -107,7 +107,7 @@ public final class OutputStats {
return conf;
}
- String getDisplayString(boolean local) {
+ public String getDisplayString(boolean local) {
StringBuilder sb = new StringBuilder();
if (success) {
sb.append("Successfully stored ");
@@ -127,11 +127,11 @@ public final class OutputStats {
return sb.toString();
}
- void setPOStore(POStore store) {
+ public void setPOStore(POStore store) {
this.store = store;
}
- void setConf(Configuration conf) {
+ public void setConf(Configuration conf) {
this.conf = conf;
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Aug 30 20:04:29 2013
@@ -38,8 +38,9 @@ import org.apache.pig.impl.util.Spillabl
import org.apache.pig.newplan.BaseOperatorPlan;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.JobStats.JobState;
-import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
/**
* PigStats encapsulates the statistics collected from a running script.
@@ -61,7 +62,10 @@ public abstract class PigStats {
private int errorCode = -1;
public static PigStats get() {
- if (tps.get() == null) tps.set(new SimplePigStats());
+ if (tps.get() == null) {
+ LOG.info("PigStats has not been set. Defaulting to SimplePigStats");
+ tps.set(new SimplePigStats());
+ }
return tps.get();
}
@@ -69,8 +73,8 @@ public abstract class PigStats {
tps.set(stats);
}
- static PigStats start() {
- tps.set(new SimplePigStats());
+ public static PigStats start(PigStats stats) {
+ tps.set(stats);
return tps.get();
}
@@ -201,6 +205,41 @@ public abstract class PigStats {
void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
+
+
+ /**
+ * This class prints a JobGraph
+ */
+ public static class JobGraphPrinter extends PlanVisitor {
+
+ StringBuffer buf;
+
+ protected JobGraphPrinter(OperatorPlan plan) {
+ super(plan,
+ new org.apache.pig.newplan.DependencyOrderWalker(
+ plan));
+ buf = new StringBuffer();
+ }
+
+ public void visit(JobStats op) throws FrontendException {
+ buf.append(op.getJobId());
+ List<Operator> succs = plan.getSuccessors(op);
+ if (succs != null) {
+ buf.append("\t->\t");
+ for (Operator p : succs) {
+ buf.append(((JobStats)p).getJobId()).append(",");
+ }
+ }
+ buf.append("\n");
+ }
+
+ @Override
+ public String toString() {
+ buf.append("\n");
+ return buf.toString();
+ }
+ }
+
/**
* JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
*/