You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/09/27 19:49:49 UTC
svn commit: r1176491 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Author: daijy
Date: Tue Sep 27 17:49:49 2011
New Revision: 1176491
URL: http://svn.apache.org/viewvc?rev=1176491&view=rev
Log:
PIG-2208: Restrict number of PIG generated Haddop counters
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 27 17:49:49 2011
@@ -259,6 +259,8 @@ PIG-2221: Couldnt find documentation for
BUG FIXES
+PIG-2208: Restrict number of PIG generated Haddop counters (rding via daijy)
+
PIG-2299: jetty 6.1.14 startup issue causes unit tests to fail in CI (thw via daijy)
PIG-2301: Some more bin/pig, build.xml cleanup for 0.9.1 (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Sep 27 17:49:49 2011
@@ -500,9 +500,8 @@ public class JobControlCompiler{
.getTemporaryPath(pigContext).toString();
tmpLocation = new Path(tmpLocationStr);
conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, LOG_DIR).toString());
-
- }
+ new Path(tmpLocation, LOG_DIR).toString());
+ }
conf.set("pig.streaming.task.output.dir", outputPathString);
}
else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
@@ -513,8 +512,13 @@ public class JobControlCompiler{
nwJob.setOutputFormatClass(PigOutputFormat.class);
+ boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+ if (disableCounter) {
+ log.info("Disable Pig custom output counters");
+ }
int idx = 0;
for (POStore sto: storeLocations) {
+ sto.setDisableCounter(disableCounter);
sto.setMultiStore(true);
sto.setIndex(idx++);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Tue Sep 27 17:49:49 2011
@@ -292,9 +292,16 @@ public class PigInputFormat extends Inpu
// also passing the multi-input flag to the back-end so that
// the multi-input record counters can be created
int m = inputs.size();
+
+ boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+ if ((m > 1) && disableCounter) {
+ log.info("Disable Pig custom input counters");
+ }
+
for (InputSplit split : splits) {
((PigSplit) split).setTotalSplits(n);
if (m > 1) ((PigSplit) split).setMultiInputs(true);
+ ((PigSplit) split).setDisableCounter(disableCounter);
}
return splits;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Tue Sep 27 17:49:49 2011
@@ -177,7 +177,7 @@ public class PigRecordReader extends Rec
loadfunc.prepareToRead(curReader, pigSplit);
}
- if (pigSplit.isMultiInputs()) {
+ if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) {
counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Tue Sep 27 17:49:49 2011
@@ -86,6 +86,12 @@ public class PigSplit extends InputSplit
// back-end to track the number of records for each input.
private boolean isMultiInputs = false;
+ // the flag indicates the custom Hadoop counter should be disabled.
+ // This is to prevent the number of counters exceeding the limit.
+ // This flag is controlled by Pig property "pig.disable.counter" (
+ // the default value is 'false').
+ private boolean disableCounter = false;
+
/**
* the job Configuration
*/
@@ -202,6 +208,7 @@ public class PigSplit extends InputSplit
@SuppressWarnings("unchecked")
public void readFields(DataInput is) throws IOException {
+ disableCounter = is.readBoolean();
isMultiInputs = is.readBoolean();
totalSplits = is.readInt();
splitIndex = is.readInt();
@@ -229,6 +236,7 @@ public class PigSplit extends InputSplit
@SuppressWarnings("unchecked")
public void write(DataOutput os) throws IOException {
+ os.writeBoolean(disableCounter);
os.writeBoolean(isMultiInputs);
os.writeInt(totalSplits);
os.writeInt(splitIndex);
@@ -374,4 +382,12 @@ public class PigSplit extends InputSplit
}
return st.toString();
}
+
+ public void setDisableCounter(boolean disableCounter) {
+ this.disableCounter = disableCounter;
+ }
+
+ public boolean disableCounter() {
+ return disableCounter;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Tue Sep 27 17:49:49 2011
@@ -66,6 +66,9 @@ public class POStore extends PhysicalOpe
// flag to distinguish single store from multiquery store.
private boolean isMultiStore;
+ // flag to indicate if the custom counter should be disabled.
+ private boolean disableCounter = false;
+
// the index of multiquery store to track counters
private int index;
@@ -101,7 +104,7 @@ public class POStore extends PhysicalOpe
if (impl != null) {
try{
storer = impl.createStoreFunc(this);
- if (!isTmpStore && impl instanceof MapReducePOStoreImpl) {
+ if (!isTmpStore && !disableCounter && impl instanceof MapReducePOStoreImpl) {
outputRecordCounter =
((MapReducePOStoreImpl) impl).createRecordCounter(this);
}
@@ -282,4 +285,12 @@ public class POStore extends PhysicalOpe
public int getIndex() {
return index;
}
+
+ public void setDisableCounter(boolean disableCounter) {
+ this.disableCounter = disableCounter;
+ }
+
+ public boolean disableCounter() {
+ return disableCounter;
+ }
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Tue Sep 27 17:49:49 2011
@@ -96,7 +96,7 @@ public final class InputStats {
sb.append("read ");
}
- if (!local) {
+ if (!local && records >= 0) {
sb.append(records).append(" records ");
} else {
sb.append("records ");
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Tue Sep 27 17:49:49 2011
@@ -95,6 +95,8 @@ public final class JobStats extends Oper
private String errorMsg;
private Exception exception = null;
+
+ private Boolean disableCounter = false;
@SuppressWarnings("deprecation")
private JobID jobId;
@@ -270,6 +272,7 @@ public final class JobStats extends Oper
.get(JobControlCompiler.PIG_REDUCE_STORES));
loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
.get("pig.inputs"));
+ disableCounter = conf.getBoolean("pig.disable.counter", false);
} catch (IOException e) {
LOG.warn("Failed to deserialize the store list", e);
}
@@ -545,13 +548,17 @@ public final class JobStats extends Oper
}
private void addOneInputStats(String fileName, int index) {
- long records = 0;
+ long records = -1;
Long n = multiInputCounters.get(
PigStatsUtil.getMultiInputsCounterName(fileName, index));
if (n != null) {
records = n;
} else {
- LOG.warn("unable to get input counter for " + fileName);
+ // the file could be empty
+ if (!disableCounter) records = 0;
+ else {
+ LOG.warn("unable to get input counter for " + fileName);
+ }
}
InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
is.setConf(conf);
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Tue Sep 27 17:49:49 2011
@@ -111,7 +111,7 @@ public final class OutputStats {
StringBuilder sb = new StringBuilder();
if (success) {
sb.append("Successfully stored ");
- if (!local) {
+ if (!local && records >= 0) {
sb.append(records).append(" records ");
} else {
sb.append("records ");
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1176491&r1=1176490&r2=1176491&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Tue Sep 27 17:49:49 2011
@@ -738,6 +738,74 @@ public class TestPigRunner {
}
}
+ @Test // PIG-2208: Restrict number of PIG generated Haddop counters
+ public void testDisablePigCounters() throws Exception {
+ PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
+ w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w1.println("B = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w1.println("C = join A by a0, B by a0;");
+ w1.println("store C into '" + OUTPUT_FILE + "';");
+ w1.close();
+
+ try {
+ String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
+
+ assertTrue(stats.isSuccessful());
+
+ assertEquals(1, stats.getNumberJobs());
+ List<InputStats> inputs = stats.getInputStats();
+ assertEquals(2, inputs.size());
+ for (InputStats instats : inputs) {
+ // the multi-input counters are disabled
+ assertEquals(-1, instats.getNumberRecords());
+ }
+
+ List<OutputStats> outputs = stats.getOutputStats();
+ assertEquals(1, outputs.size());
+ OutputStats outstats = outputs.get(0);
+ assertEquals(9, outstats.getNumberRecords());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
+ @Test // PIG-2208: Restrict number of PIG generated Haddop counters
+ public void testDisablePigCounters2() throws Exception {
+
+ PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
+ w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w1.println("B = filter A by a0 > 3;");
+ w1.println("store A into 'output';");
+ w1.println("store B into 'tmp/output';");
+ w1.close();
+
+ try {
+ String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener());
+
+ assertTrue(stats.isSuccessful());
+
+ assertEquals(1, stats.getNumberJobs());
+ List<OutputStats> outputs = stats.getOutputStats();
+ assertEquals(2, outputs.size());
+ for (OutputStats outstats : outputs) {
+ // the multi-output counters are disabled
+ assertEquals(-1, outstats.getNumberRecords());
+ }
+
+ List<InputStats> inputs = stats.getInputStats();
+ assertEquals(1, inputs.size());
+ InputStats instats = inputs.get(0);
+ assertEquals(5, instats.getNumberRecords());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, "tmp/output");
+ }
+ }
+
public static class TestNotificationListener implements PigProgressNotificationListener {
private Map<String, int[]> numMap = new HashMap<String, int[]>();