You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/19 01:58:18 UTC

svn commit: r882021 - in /hadoop/pig/branches/load-store-redesign/src/org/apache/pig: backend/hadoop/executionengine/mapReduceLayer/ backend/hadoop/executionengine/physicalLayer/relationalOperators/ impl/builtin/

Author: pradeepkth
Date: Thu Nov 19 00:58:17 2009
New Revision: 882021

URL: http://svn.apache.org/viewvc?rev=882021&view=rev
Log:
PIG-966: load-store-redesign branch: change SampleLoader and subclasses to work with new LoadFunc interface (thejas via pradeepkth)

Added:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
Modified:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 19 00:58:17 2009
@@ -44,7 +44,7 @@
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.MergeJoinIndexer;
-import org.apache.pig.impl.builtin.TupleSize;
+import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -1387,6 +1387,7 @@
 				throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.");
 			}
 			
+			//change plan to store the first join input into a temp file
 			FileSpec fSpec = getTempFileSpec();
 			MapReduceOper mro = compiledInputs[0];
 			POStore str = getStore();
@@ -1460,7 +1461,10 @@
 			}     		      
 			
 			// run POPartitionRearrange for second join table
-			lr = new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);            
+			POPartitionRearrange pr = 
+			    new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+			pr.setPigContext(pigContext);
+			lr = pr;
 			try {
 				lr.setIndex(1);
 			} catch (ExecException e) {
@@ -1817,7 +1821,7 @@
         
     	PhysicalPlan ep = new PhysicalPlan();
     	POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
-    	            new FuncSpec(TupleSize.class.getName(), (String[])null));
+    	            new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
     	uf.setResultType(DataType.TUPLE);
     	ep.add(uf);     
     	ep.add(prjStar);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Nov 19 00:58:17 2009
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 
 
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.builtin.BinStorage;
@@ -32,6 +33,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -66,14 +68,15 @@
      * 
      */
     private static final long serialVersionUID = 1L;
-	private String partitionFile;
-	private Integer totalReducers = -1;
-	// ReducerMap will store the tuple, max reducer index & min reducer index
-	private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
-	private boolean loaded;
-
-	protected static final BagFactory mBagFactory = BagFactory.getInstance();
-
+    private String partitionFile;
+    private Integer totalReducers = -1;
+    // ReducerMap will store the tuple, max reducer index & min reducer index
+    private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
+    private boolean loaded;
+
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
+    private PigContext pigContext;
+    
     public POPartitionRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -102,17 +105,22 @@
 		partitionFile = file;
 	}
 
-	/* Loads the key distribution file obtained from the sampler */
-	private void loadPartitionFile() throws RuntimeException {
-		try {
-			Integer [] redCnt = new Integer[1]; 
-			reducerMap = MapRedUtil.loadPartitionFile(partitionFile, redCnt, null, DataType.NULL);
-			totalReducers = redCnt[0];
-			loaded = true;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
+    /* Loads the key distribution file obtained from the sampler */
+    private void loadPartitionFile() throws RuntimeException {
+        try {
+            Integer [] redCnt = new Integer[1]; 
+            
+            reducerMap = MapRedUtil.loadPartitionFile(partitionFile, 
+                    redCnt, 
+                    ConfigurationUtil.toConfiguration(pigContext.getProperties()),
+                    DataType.NULL
+            );
+            totalReducers = redCnt[0];
+            loaded = true;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     @Override
     public String name() {
@@ -234,6 +242,20 @@
     }
 
     /**
+     * @param pigContext the pigContext to set
+     */
+    public void setPigContext(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
+
+    /**
+     * @return the pigContext
+     */
+    public PigContext getPigContext() {
+        return pigContext;
+    }
+
+    /**
      * Make a deep copy of this operator.  
      * @throws CloneNotSupportedException
      */

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=882021&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java Thu Nov 19 00:58:17 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * UDF to get memory size of a tuple and extracts number of rows value from
+ * special tuple created by PoissonSampleLoader
+ * It is used by skewed join.
+ * 
+ */
+
+public class GetMemNumRows extends EvalFunc<Tuple>{          
+
+    private TupleFactory factory;
+	
+    public GetMemNumRows() {
+    	factory = TupleFactory.getInstance();
+    }      
+
+    /**
+     * @param  in - input tuple
+     * @return - tuple having size in memory of this tuple and numRows if this
+     * is specially marked tuple having number of rows field 
+     */    
+    public Tuple exec(Tuple in) throws IOException {
+    	if (in == null) {
+    	    return null;
+    	}
+    	long memSize = in.getMemorySize();
+    	long numRows = 0;
+
+    	
+    	//  if this is specially marked tuple, get the number of rows
+        int tSize = in.size();
+    	if(tSize >=2 &&
+    	        in.get(tSize-2).equals(PoissonSampleLoader.NUMROWS_TUPLE_MARKER)){
+    	    numRows = (Long)in.get(tSize-1);
+    	}
+    	
+    	//create tuple to be returned
+    	Tuple t = factory.newTuple(2);
+    	t.set(0, memSize);
+    	t.set(1, numRows);
+    	return t;
+    }
+    
+    public Type getReturnType(){
+        return Tuple.class;
+    }       
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Nov 19 00:58:17 2009
@@ -33,7 +33,6 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.Pair;
 
 /**
@@ -82,8 +81,6 @@
 
 	private String inputFile_;
 
-	private long inputFileSize_;
-
 	private long totalSampleCount_;
 
 	private double heapPercentage_;
@@ -100,7 +97,6 @@
 	public PartitionSkewedKeys(String[] args) {
 		totalReducers_ = -1;
 		currentIndex_ = 0;
-		inputFileSize_ = -1;
 
 		if (args != null && args.length > 0) {
 			heapPercentage_ = Double.parseDouble(args[0]);
@@ -123,187 +119,177 @@
 	 * first field in the input tuple is the number of reducers
 	 * 
 	 * second field is the *sorted* bag of samples
+	 * this should be called only once
 	 */
 	public Map<String, Object> exec(Tuple in) throws IOException {
-		// get size of input file in bytes
-		if (inputFileSize_ == -1) {
-			try {
-				inputFileSize_ = FileLocalizer.getSize(inputFile_);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		Map<String, Object> output = new HashMap<String, Object>();
-
-		if (in == null || in.size() == 0) {			
-			return null;
-		}
-
-		totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
-		log.info("Maximum of available memory is " + totalMemory_);
-
-		ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
-		Tuple currentTuple = null;
-		long count = 0;
-		long totalMSize = 0;
-		long totalDSize = 0;
-		try {
-			totalReducers_ = (Integer) in.get(0);
-			DataBag samples = (DataBag) in.get(1);
-
-			totalSampleCount_ = samples.size();
-			
-			log.info("inputFileSize: " + inputFileSize_);
-			log.info("totalSample: " + totalSampleCount_);
-			log.info("totalReducers: " + totalReducers_);			
-
-			int maxReducers = 0;
-			Iterator<Tuple> iter = samples.iterator();
-			while (iter.hasNext()) {
-				Tuple t = iter.next();
-				if (hasSameKey(currentTuple, t) || currentTuple == null) {
-					count++;
-					totalMSize += getMemorySize(t);
-					totalDSize += getDiskSize(t);
-				} else {
-					Pair<Tuple, Integer> p = calculateReducers(currentTuple,
-							count, totalMSize, totalDSize);
-					Tuple rt = p.first;
-					if (rt != null) {
-						reducerList.add(rt);
-					}
-					if (maxReducers < p.second) {
-						maxReducers = p.second;
-					}
-					count = 1;
-					totalMSize = getMemorySize(t);
-					totalDSize = getDiskSize(t);
-				}
-
-				currentTuple = t;
-			}
-
-			// add last key
-			if (count > 0) {
-				Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
-						totalMSize, totalDSize);
-				Tuple rt = p.first;
-				if (rt != null) {
-					reducerList.add(rt);
-				}
-				if (maxReducers < p.second) {
-					maxReducers = p.second;
-				}
-			}
-
-			if (maxReducers > totalReducers_) {
-				if(pigLogger != null) {
-                    pigLogger.warn(this,"You need at least " + maxReducers
-                    		+ " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
-                } else {
-    				log.warn("You need at least " + maxReducers
-    						+ " reducers to avoid spillage and run this job efficiently.");
-                }
-			}
-
-			output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
-			output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-			
-			log.info(output.toString());
-			if (log.isDebugEnabled()) {
-				log.debug(output.toString());
-			}
-
-			return output;
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		}
+	    if (in == null || in.size() == 0) {                     
+	        return null;
+	    }
+	    Map<String, Object> output = new HashMap<String, Object>();
+
+	    totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+	    log.info("Maximum of available memory is " + totalMemory_);
+
+	    ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+	    Tuple currentTuple = null;
+	    long count = 0;
+	    
+	    // total size in memory for tuples in sample 
+	    long totalSampleMSize = 0;
+	    
+	    //total input rows for the join
+	    long totalInputRows = 0;
+	    
+	    try {
+	        totalReducers_ = (Integer) in.get(0);
+	        DataBag samples = (DataBag) in.get(1);
+
+	        totalSampleCount_ = samples.size();
+
+	        log.info("totalSample: " + totalSampleCount_);
+	        log.info("totalReducers: " + totalReducers_);			
+
+	        int maxReducers = 0;
+
+	        // first iterate the samples to find total number of rows
+	        Iterator<Tuple> iter1 = samples.iterator();
+	        while (iter1.hasNext()) {
+	            Tuple t = iter1.next();
+	            totalInputRows += (Long)t.get(t.size() - 1);
+	        }
+	                        
+	        // now iterate samples to do the reducer calculation
+	        Iterator<Tuple> iter2 = samples.iterator();
+	        while (iter2.hasNext()) {
+	            Tuple t = iter2.next();
+	            if (hasSameKey(currentTuple, t) || currentTuple == null) {
+	                count++;
+	                totalSampleMSize += getMemorySize(t);
+	            } else {
+	                Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+	                        count, totalSampleMSize, totalInputRows);
+	                Tuple rt = p.first;
+	                if (rt != null) {
+	                    reducerList.add(rt);
+	                }
+	                if (maxReducers < p.second) {
+	                    maxReducers = p.second;
+	                }
+	                count = 1;
+	                totalSampleMSize = getMemorySize(t);
+	            }
+
+	            currentTuple = t;
+	        }
+
+	        // add last key
+	        if (count > 0) {
+	            Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+	                    totalSampleMSize, totalInputRows);
+	            Tuple rt = p.first;
+	            if (rt != null) {
+	                reducerList.add(rt);
+	            }
+	            if (maxReducers < p.second) {
+	                maxReducers = p.second;
+	            }
+	        }
+
+	        if (maxReducers > totalReducers_) {
+	            if(pigLogger != null) {
+	                pigLogger.warn(this,"You need at least " + maxReducers
+	                        + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+	            } else {
+	                log.warn("You need at least " + maxReducers
+	                        + " reducers to avoid spillage and run this job efficiently.");
+	            }
+	        }
+
+	        output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+	        output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+	        log.info(output.toString());
+	        if (log.isDebugEnabled()) {
+	            log.debug(output.toString());
+	        }
+
+	        return output;
+	    } catch (Exception e) {
+	        e.printStackTrace();
+	        throw new RuntimeException(e);
+	    }
 	}
 
 	private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
-			long count, long totalMSize, long totalDSize) {
-		// get average memory size per tuple
-		double avgM = totalMSize / (double) count;
-		// get average disk size per tuple
-		double avgD = totalDSize / (double) count;
-
-		// get the number of tuples that can fit into memory
-		long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
-		// get the number of total tuples for this key
-		long tupleCount = (long) (((double) count) / totalSampleCount_
-				* inputFileSize_ / avgD);	
-
-
-		int redCount = (int) Math.round(Math.ceil((double) tupleCount
-				/ tupleMCount));
-
-		if (log.isDebugEnabled()) 
-		{
-			log.debug("avgM: " + avgM);
-			log.debug("avgD: " + avgD);
-			log.debug("count: " + count);
-			log.debug("A reducer can take " + tupleMCount + " tuples and "
-					+ tupleCount + " tuples are find for " + currentTuple);
-			log.debug("key " + currentTuple + " need " + redCount + " reducers");
-		}
+	        long count, long totalMSize, long totalTuples) {
+	    // get average memory size per tuple
+	    double avgM = totalMSize / (double) count;
+
+	    // get the number of tuples that can fit into memory
+	    long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+	    // estimate the number of total tuples for this key
+            long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) * 
+                        totalTuples);
+                            
+	    
+	    int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
+	            / tupleMCount));
+
+	    if (log.isDebugEnabled()) 
+	    {
+	        log.debug("avgM: " + avgM);
+	        log.debug("tuple count: " + keyTupleCount);
+	        log.debug("count: " + count);
+	        log.debug("A reducer can take " + tupleMCount + " tuples and "
+	                + keyTupleCount + " tuples are find for " + currentTuple);
+	        log.debug("key " + currentTuple + " need " + redCount + " reducers");
+	    }
+
+	    // this is not a skewed key
+	    if (redCount == 1) {
+	        return new Pair<Tuple, Integer>(null, 1);
+	    }
+
+	    Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+	    int i = 0;
+	    try {
+	        // set keys
+	        for (; i < currentTuple.size() - 2; i++) {
+	            t.set(i, currentTuple.get(i));
+	        }
+
+	        // set the min index of reducer for this key
+	        t.set(i++, currentIndex_);
+	        currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+	        if (currentIndex_ < 0) {
+	            currentIndex_ += totalReducers_;
+	        }
+	        // set the max index of reducer for this key
+	        t.set(i++, currentIndex_);
+	    } catch (ExecException e) {
+	        throw new RuntimeException("Failed to set value to tuple." + e);
+	    }
 
-		// this is not a skewed key
-		if (redCount == 1) {
-			return new Pair<Tuple, Integer>(null, 1);
-		}
+	    currentIndex_ = (currentIndex_ + 1) % totalReducers_;
 
-		Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
-		int i = 0;
-		try {
-			// set keys
-			for (; i < currentTuple.size() - 2; i++) {
-				t.set(i, currentTuple.get(i));
-			}
+	    Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
 
-			// set the min index of reducer for this key
-			t.set(i++, currentIndex_);
-			currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
-			if (currentIndex_ < 0) {
-				currentIndex_ += totalReducers_;
-			}
-			// set the max index of reducer for this key
-			t.set(i++, currentIndex_);
-		} catch (ExecException e) {
-			throw new RuntimeException("Failed to set value to tuple." + e);
-		}
-
-		currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
-		Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
-		return p;
+	    return p;
 	}
 
 	// the last field of the tuple is a tuple for memory size and disk size
 	private long getMemorySize(Tuple t) {
-		int s = t.size();
-		try {
-			return (Long) t.get(s - 2);
-		} catch (ExecException e) {
-			throw new RuntimeException(
-					"Unable to retrive the size field from tuple.", e);
-		}
+	    int s = t.size();
+	    try {
+	        return (Long) t.get(s - 2);
+	    } catch (ExecException e) {
+	        throw new RuntimeException(
+	                "Unable to retrive the size field from tuple.", e);
+	    }
 	}
 
-	// the last field of the tuple is a tuple for memory size and disk size
-	private long getDiskSize(Tuple t) {
-		int s = t.size();
-		try {
-			return (Long) t.get(s - 1);
-		} catch (ExecException e) {
-			throw new RuntimeException(
-					"Unable to retrive the size field from tuple.", e);
-		}
-	}
 
 	private boolean hasSameKey(Tuple t1, Tuple t2) {
 		// Have to break the tuple down and compare it field to field.

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 19 00:58:17 2009
@@ -21,65 +21,152 @@
 import java.util.ArrayList;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.PigException;
+
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.Pair;
 
 /**
- * Currently skipInterval is similar to the randomsampleloader. However, if we were to use an
- * uniform distribution, we could precompute the intervals and read it from a file.
- *
+ * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
  */
-//XXX : FIXME - make this work with new load-store redesign
 public class PoissonSampleLoader extends SampleLoader {
 	
-	// Base number of samples needed
-	private long baseNumSamples;
+        // marker string for special row with total number or rows. 
+        // this will be value of first column in the special row
+        public static final String NUMROWS_TUPLE_MARKER = 
+            "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
+        
+        //num of rows sampled so far
+        private int numRowsSampled = 0;
+        
+        //average size of tuple in memory, for tuples sampled
+        private long avgTupleMemSz = 0;
+	
+	//current row number 
+	private long rowNum = 0;
 	
-	/// Count of the map splits
-	private static final String MAPSPLITS_COUNT = "pig.mapsplits.count";
+	// number of tuples to skip after each sample
+        long skipInterval = -1;
+
+	// bytes in input to skip after every sample. 
+        // divide this by avgTupleMemSize to get skipInterval 
+	private long memToSkipPerSample = 0;
 	
-	/// Conversion factor accounts for the various encodings, compression etc
-	private static final String CONV_FACTOR = "pig.inputfile.conversionfactor";
+	// has the special row with row number information been returned
+	private boolean numRowSplTupleReturned = false;
 	
 	/// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
 	private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
 	
+	// 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
+        // set to 10 (emperically, minimum number of samples) and the confidence set to 95%
+        private static final int DEFAULT_SAMPLE_RATE = 17;
+        
+        private int sampleRate = DEFAULT_SAMPLE_RATE;
+	
 	/// % of memory available for the input data. This is currenty equal to the memory available
 	/// for the skewed join
 	private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
+
+        private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+		
+        // new Sample tuple
+        private Tuple newSample = null;
+        
+//	private final Log log = LogFactory.getLog(getClass());
 	
-	// 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
-	// set to 10 (emperically, minimum number of samples) and the confidence set to 95%
-	private static final int DEFAULT_SAMPLE_RATE = 17;
-	
-	// By default the data is multiplied by 2 to account for the encoding
-	private static final int DEFAULT_CONV_FACTOR = 2;
-	
-	private final Log log = LogFactory.getLog(getClass());
 
 	public PoissonSampleLoader(String funcSpec, String ns) {
 		super(funcSpec);
 		super.setNumSamples(Integer.valueOf(ns)); // will be overridden
 	}
-	
-	// n is the number of map tasks
-	@Override
-	public void setNumSamples(int n) {
-		super.setNumSamples(n); // will be overridden
+
+	/* (non-Javadoc)
+	 * @see org.apache.pig.LoadFunc#getNext()
+	 */
+	public Tuple getNext() throws IOException {
+	    if(numRowSplTupleReturned){
+	        // row num special row has been returned after all inputs 
+	        // were read, nothing more to read 
+	        return null;
+	    }
+	    
+
+	    if(skipInterval == -1){
+	        //select first tuple as sample and calculate
+	        // number of tuples to be skipped 
+	        Tuple t = loader.getNext();
+	        if(t == null)
+	            return createNumRowTuple(null);
+	        long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+	        memToSkipPerSample = availRedMem/sampleRate;
+	        updateSkipInterval(t);
+                
+	        rowNum++;
+                newSample = t;
+	    }
+
+	    // skip tuples
+	    for(long numSkipped  = 0; numSkipped < skipInterval; numSkipped++){
+	        if(!skipNext()){
+	            return createNumRowTuple(newSample);
+	        }
+	        rowNum++;
+	    }
+	    
+	    // skipped enough, get new sample
+	    Tuple t = loader.getNext();
+	    if(t == null)
+	        return createNumRowTuple(newSample);
+	    updateSkipInterval(t);
+	    rowNum++;
+            Tuple currentSample = newSample;
+            newSample = t;
+            return currentSample;
 	}
-	
+
+	/**
+	 * Update the average tuple size base on newly sampled tuple t
+	 * and recalculate skipInterval
+	 * @param t - tuple
+	 */
+	private void updateSkipInterval(Tuple t) {
+	    avgTupleMemSz = 
+	        ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
+	    skipInterval = memToSkipPerSample/avgTupleMemSz;
+	    
+            // skipping fewer number of rows the first few times, to reduce 
+            // the probability of first tuples size (if much smaller than rest) 
+	    // resulting in 
+            // very few samples being sampled. Sampling a little extra is OK
+	    if(numRowsSampled < 5)
+	        skipInterval = skipInterval/(10-numRowsSampled);
+            ++numRowsSampled;
+
+	}
+
+	/**
+	 * @param sample - sample tuple
+	 * @return - Tuple appended with special marker string column, num-rows column
+	 * @throws ExecException
+	 */
+	private Tuple createNumRowTuple(Tuple sample) throws ExecException {
+	    if(rowNum == 0 || sample == null)
+	        return null;
+	    TupleFactory factory = TupleFactory.getInstance();
+	    Tuple t = factory.newTuple(sample.size() + 2);
+	    for(int i=0; i<sample.size(); i++){
+	        t.set(i, sample.get(i));
+	    }
+	    t.set(sample.size(), NUMROWS_TUPLE_MARKER);
+	    t.set(sample.size() + 1, rowNum);
+	    numRowSplTupleReturned = true;
+	    return t;
+	}
+
 	/**
 	 * Computes the number of samples for the loader
 	 * 
@@ -89,100 +176,28 @@
 	 */
 	@Override
 	public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
-		int numSplits, convFactor, sampleRate;
-		Properties pcProps = pc.getProperties();
-		
-		// Set default values for the various parameters
-		try {
-			numSplits = Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT));
-		} catch (NumberFormatException e) {
-			String msg = "Couldn't retrieve the number of maps in the job";
-			throw new ExecException(msg);
-		}
-		
-		try {
-			convFactor = Integer.valueOf(pcProps.getProperty(CONV_FACTOR));
-		} catch (NumberFormatException e) {
-			convFactor = DEFAULT_CONV_FACTOR;
-		}
-		
-		try {
-			sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
-		} catch (NumberFormatException e) {
-			sampleRate = DEFAULT_SAMPLE_RATE;
-		}
-		
-		// % of memory available for the records
-		float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-                if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
-		    try {
-                        heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
-                    }catch(NumberFormatException e) {
-			// ignore, use default value
-                    }
-                }
-		
-		// we are only concerned with the first input for skewed join
-		String fname = inputs.get(0).first.getFileName();
-		
-		// calculate the base number of samples
-		try {
-			float f = (Runtime.getRuntime().maxMemory() * heapPerc) / (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
-			baseNumSamples = (long) Math.ceil(1.0 / f);
-		} catch (IOException e) {
-			int errCode = 2175;
-			String msg = "Internal error. Could not retrieve file size for the sampler.";
-			throw new ExecException(msg, errCode, PigException.BUG);
-		} catch (ArithmeticException e) {
-			int errCode = 1105;
-			String msg = "Heap percentage / Conversion factor cannot be set to 0";
-			throw new ExecException(msg,errCode,PigException.INPUT);
-		}
-		
-		// set the number of samples
-		int n = (int) ((baseNumSamples * sampleRate) / numSplits);
-		
-		// set the minimum number of samples to 1
-		n = (n > 1) ? n : 1;
-		setNumSamples(n);
+	    Properties pcProps = pc.getProperties();
+
+	    // % of memory available for the records
+	    heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+	    if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+	        try {
+	            heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+	        }catch(NumberFormatException e) {
+	            // ignore, use default value
+	        }
+	    }
+
+	    try {
+	        sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
+	    } catch (NumberFormatException e) {
+	        sampleRate = DEFAULT_SAMPLE_RATE;
+	    }
+
 	}
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getLoadCaster()
-     */
-    @Override
-    public LoadCaster getLoadCaster() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
-     */
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        // TODO Auto-generated method stub
-        
-    }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
-     */
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        // TODO Auto-generated method stub
-        
-    }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path)
-     */
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-	
+
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Thu Nov 19 00:58:17 2009
@@ -18,23 +18,23 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.util.Random;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
 
 /**
- * A loader that samples the data.  This loader can subsume loader that
- * can handle starting in the middle of a record.  Loaders that can
- * handle this should implement the SamplableLoader interface.
+ * A loader that samples the data.  
+ * It randomly samples tuples from input. The number of tuples to be sampled
+ * has to be set before the first call to getNext().
+ *  see documentation of getNext() call.
  */
-//XXX : FIXME - make this work with new load-store redesign
 public class RandomSampleLoader extends SampleLoader {
  
+    //array to store the sample tuples
+    Tuple [] samples = null;
+    //index into samples array to the next sample to be returned 
+    protected int nextSampleIdx= 0;
+    
     /**
      * Construct with a class of loader to use.
      * @param funcSpec func spec of the loader to use.
@@ -49,61 +49,67 @@
         // set the number of samples
         super.setNumSamples(Integer.valueOf(ns));
     }
-    
-    
-    @Override
-    public void setNumSamples(int n) {
-    	// Setting it to 100 as default for order by
-    	super.setNumSamples(100);
-    }
 
     /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getInputFormat()
+     * @see org.apache.pig.LoadFunc#getNext()
+     * Allocate a buffer for numSamples elements, populate it with the 
+     * first numSamples tuples, and continue scanning rest of the input.
+     * For every ith next() call, we generate a random number r s.t. 0<=r<i,
+     * and if r<numSamples we insert the new tuple into our buffer at position r.
+     * This gives us a random sample of the tuples in the partition.
      */
     @Override
-    public InputFormat getInputFormat() throws IOException {
-        // TODO Auto-generated method stub
-        return loader.getInputFormat();
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getLoadCaster()
-     */
-    @Override
-    public LoadCaster getLoadCaster() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
-     */
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        // TODO Auto-generated method stub
+    public Tuple getNext() throws IOException {
+
+        if(samples != null){
+            return getSample();
+        }
+        //else collect samples
+        samples = new Tuple[numSamples];
         
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
-     */
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        // TODO Auto-generated method stub
+        // populate the samples array with first numSamples tuples
+        Tuple t = null;
+        for(int i=0; i<numSamples; i++){
+            t = loader.getNext();
+            if(t == null)
+                break;
+            samples[i] = t;
+        }
         
+        // rowNum that starts from 1
+        int rowNum = numSamples+1;
+        Random randGen = new Random();
+
+        if(t != null){ // did not exhaust all tuples
+            while(true){
+                // collect samples until input is exhausted
+                int rand = randGen.nextInt(rowNum);
+                if(rand < numSamples){
+                    // pick this as sample
+                    Tuple sampleTuple = loader.getNext();
+                    if(sampleTuple == null)
+                        break;
+                    samples[rand] = sampleTuple;
+                }else {
+                    //skip tuple
+                    if(!skipNext())
+                        break;
+                }
+                rowNum++;
+            }
+        }        
+        
+        return getSample();
+    } 
+    
+    private Tuple getSample() {
+        if(nextSampleIdx < samples.length){
+            return samples[nextSampleIdx++];
+        }
+        else{
+            return null;
+        }
     }
 
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path)
-     */
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
  
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java Thu Nov 19 00:58:17 2009
@@ -19,36 +19,32 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Map;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
-import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Pair;
 
 /**
  * Abstract class that specifies the interface for sample loaders
  *
  */
-//XXX : FIXME - make this work with new load-store redesign
 public abstract class SampleLoader implements LoadFunc {
 
-	protected int numSamples;
-	protected long skipInterval;
+    // number of samples to be sampled
+    protected int numSamples;
+    
     protected LoadFunc loader;
-	private TupleFactory factory;
-	private boolean initialized = false;
-
+    
+    // RecordReader used by the underlying loader
+    private RecordReader<?, ?> recordReader= null;
     
     public SampleLoader(String funcSpec) {
     	loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
@@ -66,22 +62,49 @@
      * @see org.apache.pig.LoadFunc#getInputFormat()
      */
     @Override
-    public InputFormat getInputFormat() throws IOException {
+    public InputFormat<?,?> getInputFormat() throws IOException {
         return loader.getInputFormat();
+    } 
+
+    public boolean skipNext() throws IOException {
+        try {
+            return recordReader.nextKeyValue();
+        } catch (InterruptedException e) {
+            throw new IOException("Error getting input",e);
+        }
+    }
+    
+    public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc)
+    throws ExecException {
     }
+    
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return loader.getLoadCaster();
+    }
+    
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir)
+            throws IOException {
+        return loader.relativeToAbsolutePath(location, curDir);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
+     */
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+        loader.prepareToRead(reader, split);
+        this.recordReader = reader;
+    }
+    
 
     /* (non-Javadoc)
-	 * @see org.apache.pig.LoadFunc#getNext()
-	 */
-	public Tuple getNext() throws IOException {
-	   // estimate how many tuples there are in the map
-	   // based on the 
-	    return null;   
-	}
-
-	public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
-		// TODO Auto-generated method stub
-		
-	}
+     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
+     */
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        loader.setLocation(location, job);
+    }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java Thu Nov 19 00:58:17 2009
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * UDF to get memory and disk size of a tuple.
- * It is used by skewed join.
- * 
- */
-
-public class TupleSize extends EvalFunc<Tuple>{          
-
-	private TupleFactory factory;
-	
-    public TupleSize() {
-    	factory = TupleFactory.getInstance();
-    }      
-
-    /**
-     * Get memory size and disk size of input tuple
-     */    
-    public Tuple exec(Tuple in) throws IOException {
-    	if (in == null) {
-    		return null;
-    	}
-    	
-    	Tuple t = factory.newTuple(2);
-    	t.set(0, in.getMemorySize());
-    	t.set(1, in.get(in.size()-1));
-    	    	
-    	return t;
-    }
-    
-    public Type getReturnType(){
-        return Tuple.class;
-    }       
-}