You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/19 02:07:37 UTC
svn commit: r986989 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/data/ src/org/apache/pig/tools/pigstats/
Author: thejas
Date: Thu Aug 19 00:07:36 2010
New Revision: 986989
URL: http://svn.apache.org/viewvc?rev=986989&view=rev
Log:
PIG-1524:'Proactive spill count' is misleading (thejas)
Added:
hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigCounters.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 19 00:07:36 2010
@@ -132,6 +132,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1524: 'Proactive spill count' is misleading (thejas)
+
PIG-1546: Incorrect assert statements in operator evaluation (ajaykidave via
pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigCounters.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigCounters.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigCounters.java Thu Aug 19 00:07:36 2010
@@ -23,5 +23,10 @@ package org.apache.pig;
*/
public enum PigCounters {
SPILLABLE_MEMORY_MANAGER_SPILL_COUNT,
- PROACTIVE_SPILL_COUNT;
+
+ // total number of bags that have spilled proactively
+ PROACTIVE_SPILL_COUNT_BAGS,
+
+ //total number of records that have been spilled to disk
+ PROACTIVE_SPILL_COUNT_RECS;
}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Aug 19 00:07:36 2010
@@ -380,9 +380,13 @@ public abstract class DefaultAbstractBag
}
protected void incSpillCount(Enum counter) {
+ incSpillCount(counter, 1);
+ }
+
+ protected void incSpillCount(Enum counter, long numRecsSpilled) {
PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null && reporter.getCounter(counter)!=null) {
- reporter.getCounter(counter).increment(1);
+ reporter.getCounter(counter).increment(numRecsSpilled);
} else {
PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Aug 19 00:07:36 2010
@@ -17,13 +17,22 @@
*/
package org.apache.pig.data;
-import java.io.*;
-import java.util.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -37,6 +46,9 @@ public class InternalCachedBag extends D
private transient DataOutputStream out;
private transient boolean addDone;
private transient TupleFactory factory;
+
+ // used to store number of tuples spilled until counter is incremented
+ private transient int numTuplesSpilled = 0;
public InternalCachedBag() {
this(1);
@@ -93,19 +105,21 @@ public class InternalCachedBag extends D
}
}
} else {
+ // above cacheLimit, spill to disk
try {
if(out == null) {
if (log.isDebugEnabled()) {
log.debug("Memory can hold "+ mContents.size() + " records, put the rest in spill file.");
}
out = getSpillFile();
-
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
}
t.write(out);
- if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) {
- /* Increment the spill count*/
- incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
+ //periodically update number of tuples spilled
+ numTuplesSpilled++;
+ if(numTuplesSpilled > 1000){
+ updateSpillRecCounter();
}
}
catch(IOException e) {
@@ -116,6 +130,11 @@ public class InternalCachedBag extends D
mSize++;
}
+ private void updateSpillRecCounter() {
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, numTuplesSpilled);
+ numTuplesSpilled = 0;
+ }
+
public void addAll(DataBag b) {
Iterator<Tuple> iter = b.iterator();
while(iter.hasNext()) {
@@ -140,6 +159,8 @@ public class InternalCachedBag extends D
// ignore
}
}
+ if(numTuplesSpilled > 0)
+ updateSpillRecCounter();
addDone = true;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Aug 19 00:07:36 2010
@@ -54,7 +54,7 @@ import org.apache.pig.backend.hadoop.exe
* This bag spills pro-actively when the number of tuples in memory
* reaches a limit
*/
-public class InternalDistinctBag extends DefaultAbstractBag {
+public class InternalDistinctBag extends SortedSpillBag {
/**
*
@@ -70,7 +70,7 @@ public class InternalDistinctBag extends
private transient int cacheLimit;
private transient long maxMemUsage;
private transient long memUsage;
-
+
public InternalDistinctBag() {
this(1, -1.0);
}
@@ -145,7 +145,7 @@ public class InternalDistinctBag extends
}
if (mContents.size() > cacheLimit) {
- spill();
+ proactive_spill(null);
}
if (mContents.add(t)) {
@@ -178,61 +178,8 @@ public class InternalDistinctBag extends
}
}
- public long spill() {
- // Make sure we have something to spill. Don't create empty
- // files, as that will make a mess.
- if (mContents.size() == 0) return 0;
-
- // Lock the container before I spill, so that iterators aren't
- // trying to read while I'm mucking with the container.
- long spilled = 0;
-
- DataOutputStream out = null;
- try {
- out = getSpillFile();
- } catch (IOException ioe) {
- // Do not remove last file from spilled array. It was not
- // added as File.createTmpFile threw an IOException
- warn(
- "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
- return 0;
- }
- try {
- Tuple[] array = new Tuple[mContents.size()];
- mContents.toArray(array);
- Arrays.sort(array);
- for (int i = 0; i < array.length; i++) {
- array[i].write(out);
- spilled++;
- // This will spill every 16383 records.
- if ((spilled & 0x3fff) == 0) reportProgress();
- }
-
- out.flush();
- } catch (IOException ioe) {
- // Remove the last file from the spilled array, since we failed to
- // write to it.
- mSpillFiles.remove(mSpillFiles.size() - 1);
- warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
- return 0;
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
- }
- }
- }
- mContents.clear();
- memUsage = 0;
-
- // Increment the spill count
- incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
- return spilled;
- }
-
+
+
/**
* An iterator that handles getting the next tuple from the bag.
* Data can be stored in a combination of in memory and on disk.
@@ -533,4 +480,9 @@ public class InternalDistinctBag extends
}
}
}
+
+ public long spill(){
+ return proactive_spill(null);
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Aug 19 00:07:36 2010
@@ -18,9 +18,7 @@
package org.apache.pig.data;
import java.io.BufferedInputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
@@ -35,12 +33,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.PriorityQueue;
-
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -56,7 +51,7 @@ import org.apache.pig.backend.hadoop.exe
* This bag is not registered with SpillableMemoryManager. It calculates
* the number of tuples to hold in memory and spill pro-actively into files.
*/
-public class InternalSortedBag extends DefaultAbstractBag{
+public class InternalSortedBag extends SortedSpillBag{
/**
*
@@ -142,7 +137,7 @@ public class InternalSortedBag extends D
}
if (mContents.size() > cacheLimit) {
- spill();
+ proactive_spill(mComp);
}
mContents.add(t);
@@ -187,61 +182,6 @@ public class InternalSortedBag extends D
return new SortedDataBagIterator();
}
- public long spill() {
- // Make sure we have something to spill. Don't create empty
- // files, as that will make a mess.
- if (mContents.size() == 0) return 0;
-
- // Lock the container before I spill, so that iterators aren't
- // trying to read while I'm mucking with the container.
- long spilled = 0;
-
- DataOutputStream out = null;
- try {
- out = getSpillFile();
- } catch (IOException ioe) {
- // Do not remove last file from spilled array. It was not
- // added as File.createTmpFile threw an IOException
- warn(
- "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
- return 0;
- }
- try {
-
- Collections.sort((ArrayList<Tuple>)mContents, mComp);
-
- Iterator<Tuple> i = mContents.iterator();
- while (i.hasNext()) {
- i.next().write(out);
- spilled++;
- // This will spill every 16383 records.
- if ((spilled & 0x3fff) == 0) reportProgress();
- }
- out.flush();
- } catch (IOException ioe) {
- // Remove the last file from the spilled array, since we failed to
- // write to it.
- mSpillFiles.remove(mSpillFiles.size() - 1);
- warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
- return 0;
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
- }
- }
- }
- mContents.clear();
- memUsage = 0;
-
- // Increment the spill count
- incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
- return spilled;
- }
-
/**
* An iterator that handles getting the next tuple from the bag.
* Data can be stored in a combination of in memory and on disk.
@@ -523,4 +463,9 @@ public class InternalSortedBag extends D
}
}
}
+
+ public long spill(){
+ return proactive_spill(mComp);
+ }
+
}
Added: hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java?rev=986989&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java Thu Aug 19 00:07:36 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.pig.PigCounters;
+import org.apache.pig.PigWarning;
+
+/**
+ * Common functionality for proactively spilling bags that need to keep the data
+ * sorted.
+ */
+public abstract class SortedSpillBag extends DefaultAbstractBag {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Sort contents of mContents and write them to disk
+ * @param comp Comparator to sort contents of mContents
+ * @return number of tuples spilled
+ */
+ public long proactive_spill(Comparator<Tuple> comp) {
+ // Make sure we have something to spill. Don't create empty
+ // files, as that will make a mess.
+ if (mContents.size() == 0) return 0;
+
+ //count for number of objects that have spilled
+ if(mSpillFiles == null)
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
+
+ long spilled = 0;
+
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ warn(
+ "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
+ return 0;
+ }
+ try {
+ //sort the tuples
+ // as per documentation of collection.sort(), it copies to an array,
+ // sorts and copies back to collection
+ // Avoiding that extra copy back to collection (mContents) by
+ // copying to an array and using Arrays.sort
+ Tuple[] array = new Tuple[mContents.size()];
+ mContents.toArray(array);
+ if(comp == null)
+ Arrays.sort(array);
+ else
+ Arrays.sort(array,comp);
+
+ //dump the array
+ for (Tuple t : array) {
+ t.write(out);
+ spilled++;
+ // This will spill every 16383 records.
+ if ((spilled & 0x3fff) == 0) reportProgress();
+ }
+
+ out.flush();
+ } catch (IOException ioe) {
+ // Remove the last file from the spilled array, since we failed to
+ // write to it.
+ mSpillFiles.remove(mSpillFiles.size() - 1);
+ warn(
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+ return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
+ }
+ }
+ mContents.clear();
+
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
+
+ return spilled;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Thu Aug 19 00:07:36 2010
@@ -107,7 +107,8 @@ public final class JobStats extends Oper
private long hdfsBytesWritten = 0;
private long hdfsBytesRead = 0;
private long spillCount = 0;
- private long activeSpillCount = 0;
+ private long activeSpillCountObj = 0;
+ private long activeSpillCountRecs = 0;
private HashMap<String, Long> multiStoreCounters
= new HashMap<String, Long>();
@@ -162,7 +163,9 @@ public final class JobStats extends Oper
public long getSMMSpillCount() { return spillCount; }
- public long getProactiveSpillCount() { return activeSpillCount; }
+ public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
+
+ public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
public long getHdfsBytesWritten() { return hdfsBytesWritten; }
@@ -340,9 +343,11 @@ public final class JobStats extends Oper
spillCount = counters.findCounter(
PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
.getCounter();
- activeSpillCount = counters.findCounter(
- PigCounters.PROACTIVE_SPILL_COUNT).getCounter();
-
+ activeSpillCountObj = counters.findCounter(
+ PigCounters.PROACTIVE_SPILL_COUNT_BAGS).getCounter();
+ activeSpillCountRecs = counters.findCounter(
+ PigCounters.PROACTIVE_SPILL_COUNT_RECS).getCounter();
+
Iterator<Counter> iter = multistoregroup.iterator();
while (iter.hasNext()) {
Counter cter = iter.next();
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=986989&r1=986988&r2=986989&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Aug 19 00:07:36 2010
@@ -357,13 +357,25 @@ public final class PigStats {
}
/**
- * Returns the total spill counts from {@link InternalCachedBag}.
+ * Returns the total number of bags that spilled proactively
*/
- public long getProactiveSpillCount() {
+ public long getProactiveSpillCountObjects() {
Iterator<JobStats> it = jobPlan.iterator();
long ret = 0;
while (it.hasNext()) {
- ret += it.next().getProactiveSpillCount();
+ ret += it.next().getProactiveSpillCountObjects();
+ }
+ return ret;
+ }
+
+ /**
+ * Returns the total number of records that spilled proactively
+ */
+ public long getProactiveSpillCountRecords() {
+ Iterator<JobStats> it = jobPlan.iterator();
+ long ret = 0;
+ while (it.hasNext()) {
+ ret += it.next().getProactiveSpillCountRecs();
}
return ret;
}
@@ -587,8 +599,10 @@ public final class PigStats {
sb.append("Total bytes written : " + getBytesWritten()).append("\n");
sb.append("Spillable Memory Manager spill count : "
+ getSMMSpillCount()).append("\n");
- sb.append("Proactive spill count : "
- + getProactiveSpillCount()).append("\n");
+ sb.append("Total bags proactively spilled: "
+ + getProactiveSpillCountObjects()).append("\n");
+ sb.append("Total records proactively spilled: "
+ + getProactiveSpillCountRecords()).append("\n");
sb.append("\nJob DAG:\n").append(jobPlan.toString());