You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/07/21 19:39:14 UTC
svn commit: r966326 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Author: rding
Date: Wed Jul 21 17:39:14 2010
New Revision: 966326
URL: http://svn.apache.org/viewvc?rev=966326&view=rev
Log:
PIG-1478: Add progress notification listener to PigRunner API
Added:
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Jul 21 17:39:14 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1478: Add progress notification listener to PigRunner API (rding)
+
PIG-1472: Optimize serialization/deserialization between Map and Reduce and between MR jobs (thejas)
PIG-1389: Implement Pig counter to track number of rows for each input files
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Jul 21 17:39:14 2010
@@ -62,6 +62,7 @@ import org.apache.pig.impl.util.JarManag
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.cmdline.CmdLineParser;
@@ -100,10 +101,10 @@ public class Main {
public static void main(String args[]) {
GenericOptionsParser parser = new GenericOptionsParser(args);
String[] pigArgs = parser.getRemainingArgs();
- System.exit(run(pigArgs));
+ System.exit(run(pigArgs, null));
}
-static int run(String args[]) {
+static int run(String args[], PigProgressNotificationListener listener) {
int rc = 1;
Properties properties = new Properties();
PropertiesUtil.loadDefaultProperties(properties);
@@ -287,6 +288,10 @@ static int run(String args[]) {
// create the static script state object
String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
ScriptState scriptState = ScriptState.start(commandLine);
+ if (listener != null) {
+ scriptState.registerListener(listener);
+ }
+
if(logFileName == null && !userSpecifiedLog) {
logFileName = validateLogFile(properties.getProperty("pig.logfile"), null);
Modified: hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigRunner.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigRunner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigRunner.java Wed Jul 21 17:39:14 2010
@@ -20,6 +20,7 @@ package org.apache.pig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
@@ -45,10 +46,10 @@ public abstract class PigRunner {
public final static int THROWABLE_EXCEPTION = 8;
}
- public static PigStats run(String[] args) {
+ public static PigStats run(String[] args, PigProgressNotificationListener listener) {
GenericOptionsParser parser = new GenericOptionsParser(args);
String[] pigArgs = parser.getRemainingArgs();
- return PigStatsUtil.getPigStats(Main.run(pigArgs));
+ return PigStatsUtil.getPigStats(Main.run(pigArgs, listener));
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Jul 21 17:39:14 2010
@@ -63,8 +63,10 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.ScriptState;
/**
@@ -142,6 +144,8 @@ public class MapReduceLauncher extends L
List<Job> jobsWithoutIds = jc.getWaitingJobs();
log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
+ ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
+
String jobTrackerAdd;
String port;
String jobTrackerLoc;
@@ -184,6 +188,9 @@ public class MapReduceLauncher extends L
log.info("More information at: http://"+ jobTrackerLoc+
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
}
+
+ ScriptState.get().emitJobStartedNotification(
+ job.getAssignedJobID().toString());
}
else{
// This job is not assigned an id yet.
@@ -194,8 +201,11 @@ public class MapReduceLauncher extends L
double prog = (numMRJobsCompl+calculateProgress(jc, jobClient))/totalMRJobs;
if(prog>=(lastProg+0.01)){
int perCom = (int)(prog * 100);
- if(perCom!=100)
+ if(perCom!=100) {
log.info( perCom + "% complete");
+
+ ScriptState.get().emitProgressUpdatedNotification(perCom);
+ }
}
lastProg = prog;
}
@@ -266,6 +276,8 @@ public class MapReduceLauncher extends L
jc.stop();
}
+ ScriptState.get().emitProgressUpdatedNotification(100);
+
log.info( "100% complete");
boolean failed = false;
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Jul 21 17:39:14 2010
@@ -422,6 +422,10 @@ public final class JobStats extends Oper
ds.setPOStore(sto);
ds.setConf(conf);
outputs.add(ds);
+
+ if (state == JobState.SUCCESS) {
+ ScriptState.get().emitOutputCompletedNotification(ds);
+ }
}
} else {
for (POStore sto : mapStores) {
@@ -472,6 +476,10 @@ public final class JobStats extends Oper
ds.setPOStore(sto);
ds.setConf(conf);
outputs.add(ds);
+
+ if (state == JobState.SUCCESS) {
+ ScriptState.get().emitOutputCompletedNotification(ds);
+ }
}
void addInputStatistics() {
Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=966326&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Wed Jul 21 17:39:14 2010
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface PigProgressNotificationListener extends java.util.EventListener {
+
+ public void launchStartedNotification(int numJobsToLaunch);
+
+ public void jobsSubmittedNotification(int numJobsSubmitted);
+
+ public void jobStartedNotification(String assignedJobId);
+
+ public void jobFinishedNotification(JobStats jobStats);
+
+ public void jobFailedNotification(JobStats jobStats);
+
+ public void outputCompletedNotification(OutputStats outputStats);
+
+ public void progressUpdatedNotification(int progress);
+
+ public void launchCompletedNotification(int numJobsSucceeded);
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Jul 21 17:39:14 2010
@@ -166,6 +166,8 @@ public abstract class PigStatsUtil {
JobControlCompiler jcc, MROperPlan plan) {
PigStats ps = PigStats.start();
ps.start(pc, client, jcc, plan);
+
+ ScriptState.get().emitLaunchStartedNotification(plan.size());
}
/**
@@ -184,6 +186,8 @@ public abstract class PigStatsUtil {
LOG.error("Error message: " + errMsg);
}
}
+ ScriptState.get().emitLaunchCompletedNotification(
+ ps.getNumberSuccessfulJobs());
if (display) ps.display();
}
@@ -238,16 +242,20 @@ public abstract class PigStatsUtil {
*/
public static void accumulateStats(JobControl jc) {
PigStats ps = PigStats.get();
-
+ ScriptState ss = ScriptState.get();
+
for (Job job : jc.getSuccessfulJobs()) {
- accumulateSuccessStatistics(ps, job);
+ JobStats js = accumulateSuccessStatistics(ps, job);
+ if (js != null) {
+ ss.emitjobFinishedNotification(js);
+ }
}
for (Job job : jc.getFailedJobs()) {
- JobStats js =
- addFailedJobStats(ps, job);
+ JobStats js = addFailedJobStats(ps, job);
if (js != null) {
- js.setErrorMsg(job.getMessage());
+ js.setErrorMsg(job.getMessage());
+ ss.emitJobFailedNotification(js);
} else {
LOG.warn("unable to add failed job stats: " + job);
}
@@ -285,7 +293,7 @@ public abstract class PigStatsUtil {
return js;
}
- private static void accumulateSuccessStatistics(PigStats ps, Job job) {
+ private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
JobStats js = ps.addJobStats(job);
if (js == null) {
LOG.warn("unable to add job stats");
@@ -312,6 +320,7 @@ public abstract class PigStatsUtil {
js.addInputStatistics();
}
+ return js;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Jul 21 17:39:14 2010
@@ -177,6 +177,9 @@ public class ScriptState {
private Map<MapReduceOper, String> featureMap = null;
private Map<MapReduceOper, String> aliasMap = null;
+ private List<PigProgressNotificationListener> listeners
+ = new ArrayList<PigProgressNotificationListener>();
+
public static ScriptState start(String commandLine) {
ScriptState ss = new ScriptState(UUID.randomUUID().toString());
ss.setCommandLine(commandLine);
@@ -196,6 +199,58 @@ public class ScriptState {
return tss.get();
}
+ public void registerListener(PigProgressNotificationListener listener) {
+ listeners.add(listener);
+ }
+
+ public void emitLaunchStartedNotification(int numJobsToLaunch) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.launchStartedNotification(numJobsToLaunch);
+ }
+ }
+
+ public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobsSubmittedNotification(numJobsSubmitted);
+ }
+ }
+
+ public void emitJobStartedNotification(String assignedJobId) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobStartedNotification(assignedJobId);
+ }
+ }
+
+ public void emitjobFinishedNotification(JobStats jobStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobFinishedNotification(jobStats);
+ }
+ }
+
+ public void emitJobFailedNotification(JobStats jobStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobFailedNotification(jobStats);
+ }
+ }
+
+ public void emitOutputCompletedNotification(OutputStats outputStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.outputCompletedNotification(outputStats);
+ }
+ }
+
+ public void emitProgressUpdatedNotification(int progress) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.progressUpdatedNotification(progress);
+ }
+ }
+
+ public void emitLaunchCompletedNotification(int numJobsSucceeded) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.launchCompletedNotification(numJobsSucceeded);
+ }
+ }
+
public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
LOG.info("Pig script settings are added to the job");
conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jul 21 17:39:14 2010
@@ -35,6 +35,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.junit.After;
@@ -81,7 +82,7 @@ public class TestPigRunner {
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.isSuccessful());
@@ -109,7 +110,7 @@ public class TestPigRunner {
w.close();
String[] args = { PIG_FILE };
try {
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.isSuccessful());
assertTrue(stats.getJobGraph().size() == 3);
assertTrue(stats.getJobGraph().getSinks().size() == 1);
@@ -147,7 +148,7 @@ public class TestPigRunner {
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.isSuccessful());
assertTrue(stats.getJobGraph().size() == 1);
assertEquals(5, stats.getRecordWritten());
@@ -186,7 +187,7 @@ public class TestPigRunner {
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.isSuccessful());
assertTrue(stats.getJobGraph().size() == 1);
assertEquals(4, stats.getRecordWritten());
@@ -218,7 +219,7 @@ public class TestPigRunner {
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
String[] args = { "-c", PIG_FILE };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, null);
assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
assertTrue(stats.getErrorCode() == 1000);
assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}",
@@ -228,14 +229,14 @@ public class TestPigRunner {
@Test
public void simpleNegativeTest2() throws Exception {
String[] args = { "-c", "-e", "this is a test" };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
}
@Test
public void simpleNegativeTest3() throws Exception {
String[] args = { "-c", "-y" };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
assertEquals("Found unknown option (-y) at position 2",
stats.getErrorMessage());
@@ -257,7 +258,7 @@ public class TestPigRunner {
try {
String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args);
+ PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
assertTrue(stats.getJobGraph().size() == 2);
@@ -296,4 +297,62 @@ public class TestPigRunner {
String name = PigStatsUtil.getMultiInputsCounterName(s);
assertEquals(PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "batchtest", name);
}
+
+ private static class TestNotificationListener implements PigProgressNotificationListener {
+
+ private int numJobsToLaunch = 0;
+ private int numJobsSubmitted = 0;
+ private int numJobStarted = 0;
+ private int numJobFinished = 0;
+
+ @Override
+ public void launchStartedNotification(int numJobsToLaunch) {
+ System.out.println("++++ numJobsToLaunch: " + numJobsToLaunch);
+ this.numJobsToLaunch = numJobsToLaunch;
+ }
+
+ @Override
+ public void jobFailedNotification(JobStats jobStats) {
+ System.out.println("++++ job failed: " + jobStats.getJobId());
+ }
+
+ @Override
+ public void jobFinishedNotification(JobStats jobStats) {
+ System.out.println("++++ job finished: " + jobStats.getJobId());
+ numJobFinished++;
+ }
+
+ @Override
+ public void jobStartedNotification(String assignedJobId) {
+ System.out.println("++++ job started: " + assignedJobId);
+ numJobStarted++;
+ }
+
+ @Override
+ public void jobsSubmittedNotification(int numJobsSubmitted) {
+ System.out.println("++++ jobs submitted: " + numJobsSubmitted);
+ this.numJobsSubmitted += numJobsSubmitted;
+ }
+
+ @Override
+ public void launchCompletedNotification(int numJobsSucceeded) {
+ System.out.println("++++ numJobsSucceeded: " + numJobsSucceeded);
+ System.out.println("");
+ assertEquals(this.numJobsToLaunch, numJobsSucceeded);
+ assertEquals(this.numJobsSubmitted, numJobsSucceeded);
+ assertEquals(this.numJobStarted, numJobsSucceeded);
+ assertEquals(this.numJobFinished, numJobsSucceeded);
+ }
+
+ @Override
+ public void outputCompletedNotification(OutputStats outputStats) {
+ System.out.println("++++ output done: " + outputStats.getLocation());
+ }
+
+ @Override
+ public void progressUpdatedNotification(int progress) {
+ System.out.println("++++ progress: " + progress + "%");
+ }
+
+ }
}