You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/19 01:58:18 UTC
svn commit: r882021 - in
/hadoop/pig/branches/load-store-redesign/src/org/apache/pig:
backend/hadoop/executionengine/mapReduceLayer/
backend/hadoop/executionengine/physicalLayer/relationalOperators/
impl/builtin/
Author: pradeepkth
Date: Thu Nov 19 00:58:17 2009
New Revision: 882021
URL: http://svn.apache.org/viewvc?rev=882021&view=rev
Log:
PIG-966: load-store-redesign branch: change SampleLoader and subclasses to work with new LoadFunc interface (thejas via pradeepkth)
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 19 00:58:17 2009
@@ -44,7 +44,7 @@
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.builtin.MergeJoinIndexer;
-import org.apache.pig.impl.builtin.TupleSize;
+import org.apache.pig.impl.builtin.GetMemNumRows;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
@@ -1387,6 +1387,7 @@
throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.");
}
+ //change plan to store the first join input into a temp file
FileSpec fSpec = getTempFileSpec();
MapReduceOper mro = compiledInputs[0];
POStore str = getStore();
@@ -1460,7 +1461,10 @@
}
// run POPartitionRearrange for second join table
- lr = new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ POPartitionRearrange pr =
+ new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ pr.setPigContext(pigContext);
+ lr = pr;
try {
lr.setIndex(1);
} catch (ExecException e) {
@@ -1817,7 +1821,7 @@
PhysicalPlan ep = new PhysicalPlan();
POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
- new FuncSpec(TupleSize.class.getName(), (String[])null));
+ new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
uf.setResultType(DataType.TUPLE);
ep.add(uf);
ep.add(prjStar);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Nov 19 00:58:17 2009
@@ -25,6 +25,7 @@
import java.util.Iterator;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.builtin.BinStorage;
@@ -32,6 +33,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -66,14 +68,15 @@
*
*/
private static final long serialVersionUID = 1L;
- private String partitionFile;
- private Integer totalReducers = -1;
- // ReducerMap will store the tuple, max reducer index & min reducer index
- private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
- private boolean loaded;
-
- protected static final BagFactory mBagFactory = BagFactory.getInstance();
-
+ private String partitionFile;
+ private Integer totalReducers = -1;
+ // ReducerMap will store the tuple, max reducer index & min reducer index
+ private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
+ private boolean loaded;
+
+ protected static final BagFactory mBagFactory = BagFactory.getInstance();
+ private PigContext pigContext;
+
public POPartitionRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -102,17 +105,22 @@
partitionFile = file;
}
- /* Loads the key distribution file obtained from the sampler */
- private void loadPartitionFile() throws RuntimeException {
- try {
- Integer [] redCnt = new Integer[1];
- reducerMap = MapRedUtil.loadPartitionFile(partitionFile, redCnt, null, DataType.NULL);
- totalReducers = redCnt[0];
- loaded = true;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ /* Loads the key distribution file obtained from the sampler */
+ private void loadPartitionFile() throws RuntimeException {
+ try {
+ Integer [] redCnt = new Integer[1];
+
+ reducerMap = MapRedUtil.loadPartitionFile(partitionFile,
+ redCnt,
+ ConfigurationUtil.toConfiguration(pigContext.getProperties()),
+ DataType.NULL
+ );
+ totalReducers = redCnt[0];
+ loaded = true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
@Override
public String name() {
@@ -234,6 +242,20 @@
}
/**
+ * @param pigContext the pigContext to set
+ */
+ public void setPigContext(PigContext pigContext) {
+ this.pigContext = pigContext;
+ }
+
+ /**
+ * @return the pigContext
+ */
+ public PigContext getPigContext() {
+ return pigContext;
+ }
+
+ /**
* Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=882021&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java Thu Nov 19 00:58:17 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * UDF to get memory size of a tuple and extracts number of rows value from
+ * special tuple created by PoissonSampleLoader
+ * It is used by skewed join.
+ *
+ */
+
+public class GetMemNumRows extends EvalFunc<Tuple>{
+
+ private TupleFactory factory;
+
+ public GetMemNumRows() {
+ factory = TupleFactory.getInstance();
+ }
+
+ /**
+ * @param in - input tuple
+ * @return - tuple having size in memory of this tuple and numRows if this
+ * is specially marked tuple having number of rows field
+ */
+ public Tuple exec(Tuple in) throws IOException {
+ if (in == null) {
+ return null;
+ }
+ long memSize = in.getMemorySize();
+ long numRows = 0;
+
+
+ // if this is specially marked tuple, get the number of rows
+ int tSize = in.size();
+ if(tSize >=2 &&
+ in.get(tSize-2).equals(PoissonSampleLoader.NUMROWS_TUPLE_MARKER)){
+ numRows = (Long)in.get(tSize-1);
+ }
+
+ //create tuple to be returned
+ Tuple t = factory.newTuple(2);
+ t.set(0, memSize);
+ t.set(1, numRows);
+ return t;
+ }
+
+ public Type getReturnType(){
+ return Tuple.class;
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Nov 19 00:58:17 2009
@@ -33,7 +33,6 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.util.Pair;
/**
@@ -82,8 +81,6 @@
private String inputFile_;
- private long inputFileSize_;
-
private long totalSampleCount_;
private double heapPercentage_;
@@ -100,7 +97,6 @@
public PartitionSkewedKeys(String[] args) {
totalReducers_ = -1;
currentIndex_ = 0;
- inputFileSize_ = -1;
if (args != null && args.length > 0) {
heapPercentage_ = Double.parseDouble(args[0]);
@@ -123,187 +119,177 @@
* first field in the input tuple is the number of reducers
*
* second field is the *sorted* bag of samples
+ * this should be called only once
*/
public Map<String, Object> exec(Tuple in) throws IOException {
- // get size of input file in bytes
- if (inputFileSize_ == -1) {
- try {
- inputFileSize_ = FileLocalizer.getSize(inputFile_);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- Map<String, Object> output = new HashMap<String, Object>();
-
- if (in == null || in.size() == 0) {
- return null;
- }
-
- totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
- log.info("Maximum of available memory is " + totalMemory_);
-
- ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
- Tuple currentTuple = null;
- long count = 0;
- long totalMSize = 0;
- long totalDSize = 0;
- try {
- totalReducers_ = (Integer) in.get(0);
- DataBag samples = (DataBag) in.get(1);
-
- totalSampleCount_ = samples.size();
-
- log.info("inputFileSize: " + inputFileSize_);
- log.info("totalSample: " + totalSampleCount_);
- log.info("totalReducers: " + totalReducers_);
-
- int maxReducers = 0;
- Iterator<Tuple> iter = samples.iterator();
- while (iter.hasNext()) {
- Tuple t = iter.next();
- if (hasSameKey(currentTuple, t) || currentTuple == null) {
- count++;
- totalMSize += getMemorySize(t);
- totalDSize += getDiskSize(t);
- } else {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple,
- count, totalMSize, totalDSize);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- count = 1;
- totalMSize = getMemorySize(t);
- totalDSize = getDiskSize(t);
- }
-
- currentTuple = t;
- }
-
- // add last key
- if (count > 0) {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
- totalMSize, totalDSize);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- }
-
- if (maxReducers > totalReducers_) {
- if(pigLogger != null) {
- pigLogger.warn(this,"You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
- } else {
- log.warn("You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.");
- }
- }
-
- output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
- output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-
- log.info(output.toString());
- if (log.isDebugEnabled()) {
- log.debug(output.toString());
- }
-
- return output;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
+ if (in == null || in.size() == 0) {
+ return null;
+ }
+ Map<String, Object> output = new HashMap<String, Object>();
+
+ totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+ log.info("Maximum of available memory is " + totalMemory_);
+
+ ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+ Tuple currentTuple = null;
+ long count = 0;
+
+ // total size in memory for tuples in sample
+ long totalSampleMSize = 0;
+
+ //total input rows for the join
+ long totalInputRows = 0;
+
+ try {
+ totalReducers_ = (Integer) in.get(0);
+ DataBag samples = (DataBag) in.get(1);
+
+ totalSampleCount_ = samples.size();
+
+ log.info("totalSample: " + totalSampleCount_);
+ log.info("totalReducers: " + totalReducers_);
+
+ int maxReducers = 0;
+
+ // first iterate the samples to find total number of rows
+ Iterator<Tuple> iter1 = samples.iterator();
+ while (iter1.hasNext()) {
+ Tuple t = iter1.next();
+ totalInputRows += (Long)t.get(t.size() - 1);
+ }
+
+ // now iterate samples to do the reducer calculation
+ Iterator<Tuple> iter2 = samples.iterator();
+ while (iter2.hasNext()) {
+ Tuple t = iter2.next();
+ if (hasSameKey(currentTuple, t) || currentTuple == null) {
+ count++;
+ totalSampleMSize += getMemorySize(t);
+ } else {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+ count, totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ count = 1;
+ totalSampleMSize = getMemorySize(t);
+ }
+
+ currentTuple = t;
+ }
+
+ // add last key
+ if (count > 0) {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+ totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ }
+
+ if (maxReducers > totalReducers_) {
+ if(pigLogger != null) {
+ pigLogger.warn(this,"You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+ } else {
+ log.warn("You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.");
+ }
+ }
+
+ output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+ output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+ log.info(output.toString());
+ if (log.isDebugEnabled()) {
+ log.debug(output.toString());
+ }
+
+ return output;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
- long count, long totalMSize, long totalDSize) {
- // get average memory size per tuple
- double avgM = totalMSize / (double) count;
- // get average disk size per tuple
- double avgD = totalDSize / (double) count;
-
- // get the number of tuples that can fit into memory
- long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
- // get the number of total tuples for this key
- long tupleCount = (long) (((double) count) / totalSampleCount_
- * inputFileSize_ / avgD);
-
-
- int redCount = (int) Math.round(Math.ceil((double) tupleCount
- / tupleMCount));
-
- if (log.isDebugEnabled())
- {
- log.debug("avgM: " + avgM);
- log.debug("avgD: " + avgD);
- log.debug("count: " + count);
- log.debug("A reducer can take " + tupleMCount + " tuples and "
- + tupleCount + " tuples are find for " + currentTuple);
- log.debug("key " + currentTuple + " need " + redCount + " reducers");
- }
+ long count, long totalMSize, long totalTuples) {
+ // get average memory size per tuple
+ double avgM = totalMSize / (double) count;
+
+ // get the number of tuples that can fit into memory
+ long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+ // estimate the number of total tuples for this key
+ long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) *
+ totalTuples);
+
+
+ int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
+ / tupleMCount));
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("avgM: " + avgM);
+ log.debug("tuple count: " + keyTupleCount);
+ log.debug("count: " + count);
+ log.debug("A reducer can take " + tupleMCount + " tuples and "
+ + keyTupleCount + " tuples are find for " + currentTuple);
+ log.debug("key " + currentTuple + " need " + redCount + " reducers");
+ }
+
+ // this is not a skewed key
+ if (redCount == 1) {
+ return new Pair<Tuple, Integer>(null, 1);
+ }
+
+ Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+ int i = 0;
+ try {
+ // set keys
+ for (; i < currentTuple.size() - 2; i++) {
+ t.set(i, currentTuple.get(i));
+ }
+
+ // set the min index of reducer for this key
+ t.set(i++, currentIndex_);
+ currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+ if (currentIndex_ < 0) {
+ currentIndex_ += totalReducers_;
+ }
+ // set the max index of reducer for this key
+ t.set(i++, currentIndex_);
+ } catch (ExecException e) {
+ throw new RuntimeException("Failed to set value to tuple." + e);
+ }
- // this is not a skewed key
- if (redCount == 1) {
- return new Pair<Tuple, Integer>(null, 1);
- }
+ currentIndex_ = (currentIndex_ + 1) % totalReducers_;
- Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
- int i = 0;
- try {
- // set keys
- for (; i < currentTuple.size() - 2; i++) {
- t.set(i, currentTuple.get(i));
- }
+ Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
- // set the min index of reducer for this key
- t.set(i++, currentIndex_);
- currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
- if (currentIndex_ < 0) {
- currentIndex_ += totalReducers_;
- }
- // set the max index of reducer for this key
- t.set(i++, currentIndex_);
- } catch (ExecException e) {
- throw new RuntimeException("Failed to set value to tuple." + e);
- }
-
- currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
- Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
- return p;
+ return p;
}
// the last field of the tuple is a tuple for memory size and disk size
private long getMemorySize(Tuple t) {
- int s = t.size();
- try {
- return (Long) t.get(s - 2);
- } catch (ExecException e) {
- throw new RuntimeException(
- "Unable to retrive the size field from tuple.", e);
- }
+ int s = t.size();
+ try {
+ return (Long) t.get(s - 2);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Unable to retrive the size field from tuple.", e);
+ }
}
- // the last field of the tuple is a tuple for memory size and disk size
- private long getDiskSize(Tuple t) {
- int s = t.size();
- try {
- return (Long) t.get(s - 1);
- } catch (ExecException e) {
- throw new RuntimeException(
- "Unable to retrive the size field from tuple.", e);
- }
- }
private boolean hasSameKey(Tuple t1, Tuple t2) {
// Have to break the tuple down and compare it field to field.
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 19 00:58:17 2009
@@ -21,65 +21,152 @@
import java.util.ArrayList;
import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.PigException;
+
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.Pair;
/**
- * Currently skipInterval is similar to the randomsampleloader. However, if we were to use an
- * uniform distribution, we could precompute the intervals and read it from a file.
- *
+ * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
*/
-//XXX : FIXME - make this work with new load-store redesign
public class PoissonSampleLoader extends SampleLoader {
- // Base number of samples needed
- private long baseNumSamples;
+ // marker string for special row with total number or rows.
+ // this will be value of first column in the special row
+ public static final String NUMROWS_TUPLE_MARKER =
+ "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
+
+ //num of rows sampled so far
+ private int numRowsSampled = 0;
+
+ //average size of tuple in memory, for tuples sampled
+ private long avgTupleMemSz = 0;
+
+ //current row number
+ private long rowNum = 0;
- /// Count of the map splits
- private static final String MAPSPLITS_COUNT = "pig.mapsplits.count";
+ // number of tuples to skip after each sample
+ long skipInterval = -1;
+
+ // bytes in input to skip after every sample.
+ // divide this by avgTupleMemSize to get skipInterval
+ private long memToSkipPerSample = 0;
- /// Conversion factor accounts for the various encodings, compression etc
- private static final String CONV_FACTOR = "pig.inputfile.conversionfactor";
+ // has the special row with row number information been returned
+ private boolean numRowSplTupleReturned = false;
/// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
+ // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
+ // set to 10 (emperically, minimum number of samples) and the confidence set to 95%
+ private static final int DEFAULT_SAMPLE_RATE = 17;
+
+ private int sampleRate = DEFAULT_SAMPLE_RATE;
+
/// % of memory available for the input data. This is currenty equal to the memory available
/// for the skewed join
private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
+
+ private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+
+ // new Sample tuple
+ private Tuple newSample = null;
+
+// private final Log log = LogFactory.getLog(getClass());
- // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
- // set to 10 (emperically, minimum number of samples) and the confidence set to 95%
- private static final int DEFAULT_SAMPLE_RATE = 17;
-
- // By default the data is multiplied by 2 to account for the encoding
- private static final int DEFAULT_CONV_FACTOR = 2;
-
- private final Log log = LogFactory.getLog(getClass());
public PoissonSampleLoader(String funcSpec, String ns) {
super(funcSpec);
super.setNumSamples(Integer.valueOf(ns)); // will be overridden
}
-
- // n is the number of map tasks
- @Override
- public void setNumSamples(int n) {
- super.setNumSamples(n); // will be overridden
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+ public Tuple getNext() throws IOException {
+ if(numRowSplTupleReturned){
+ // row num special row has been returned after all inputs
+ // were read, nothing more to read
+ return null;
+ }
+
+
+ if(skipInterval == -1){
+ //select first tuple as sample and calculate
+ // number of tuples to be skipped
+ Tuple t = loader.getNext();
+ if(t == null)
+ return createNumRowTuple(null);
+ long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+ memToSkipPerSample = availRedMem/sampleRate;
+ updateSkipInterval(t);
+
+ rowNum++;
+ newSample = t;
+ }
+
+ // skip tuples
+ for(long numSkipped = 0; numSkipped < skipInterval; numSkipped++){
+ if(!skipNext()){
+ return createNumRowTuple(newSample);
+ }
+ rowNum++;
+ }
+
+ // skipped enough, get new sample
+ Tuple t = loader.getNext();
+ if(t == null)
+ return createNumRowTuple(newSample);
+ updateSkipInterval(t);
+ rowNum++;
+ Tuple currentSample = newSample;
+ newSample = t;
+ return currentSample;
}
-
+
+ /**
+ * Update the average tuple size base on newly sampled tuple t
+ * and recalculate skipInterval
+ * @param t - tuple
+ */
+ private void updateSkipInterval(Tuple t) {
+ avgTupleMemSz =
+ ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
+ skipInterval = memToSkipPerSample/avgTupleMemSz;
+
+ // skipping fewer number of rows the first few times, to reduce
+ // the probability of first tuples size (if much smaller than rest)
+ // resulting in
+ // very few samples being sampled. Sampling a little extra is OK
+ if(numRowsSampled < 5)
+ skipInterval = skipInterval/(10-numRowsSampled);
+ ++numRowsSampled;
+
+ }
+
+ /**
+ * @param sample - sample tuple
+ * @return - Tuple appended with special marker string column, num-rows column
+ * @throws ExecException
+ */
+ private Tuple createNumRowTuple(Tuple sample) throws ExecException {
+ if(rowNum == 0 || sample == null)
+ return null;
+ TupleFactory factory = TupleFactory.getInstance();
+ Tuple t = factory.newTuple(sample.size() + 2);
+ for(int i=0; i<sample.size(); i++){
+ t.set(i, sample.get(i));
+ }
+ t.set(sample.size(), NUMROWS_TUPLE_MARKER);
+ t.set(sample.size() + 1, rowNum);
+ numRowSplTupleReturned = true;
+ return t;
+ }
+
/**
* Computes the number of samples for the loader
*
@@ -89,100 +176,28 @@
*/
@Override
public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
- int numSplits, convFactor, sampleRate;
- Properties pcProps = pc.getProperties();
-
- // Set default values for the various parameters
- try {
- numSplits = Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT));
- } catch (NumberFormatException e) {
- String msg = "Couldn't retrieve the number of maps in the job";
- throw new ExecException(msg);
- }
-
- try {
- convFactor = Integer.valueOf(pcProps.getProperty(CONV_FACTOR));
- } catch (NumberFormatException e) {
- convFactor = DEFAULT_CONV_FACTOR;
- }
-
- try {
- sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
- } catch (NumberFormatException e) {
- sampleRate = DEFAULT_SAMPLE_RATE;
- }
-
- // % of memory available for the records
- float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
- if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
- try {
- heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
- }catch(NumberFormatException e) {
- // ignore, use default value
- }
- }
-
- // we are only concerned with the first input for skewed join
- String fname = inputs.get(0).first.getFileName();
-
- // calculate the base number of samples
- try {
- float f = (Runtime.getRuntime().maxMemory() * heapPerc) / (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
- baseNumSamples = (long) Math.ceil(1.0 / f);
- } catch (IOException e) {
- int errCode = 2175;
- String msg = "Internal error. Could not retrieve file size for the sampler.";
- throw new ExecException(msg, errCode, PigException.BUG);
- } catch (ArithmeticException e) {
- int errCode = 1105;
- String msg = "Heap percentage / Conversion factor cannot be set to 0";
- throw new ExecException(msg,errCode,PigException.INPUT);
- }
-
- // set the number of samples
- int n = (int) ((baseNumSamples * sampleRate) / numSplits);
-
- // set the minimum number of samples to 1
- n = (n > 1) ? n : 1;
- setNumSamples(n);
+ Properties pcProps = pc.getProperties();
+
+ // % of memory available for the records
+ heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+ if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+ try {
+ heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+ }catch(NumberFormatException e) {
+ // ignore, use default value
+ }
+ }
+
+ try {
+ sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
+ } catch (NumberFormatException e) {
+ sampleRate = DEFAULT_SAMPLE_RATE;
+ }
+
}
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#getLoadCaster()
- */
- @Override
- public LoadCaster getLoadCaster() {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
- */
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
- // TODO Auto-generated method stub
-
- }
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
- */
- @Override
- public void setLocation(String location, Job job) throws IOException {
- // TODO Auto-generated method stub
-
- }
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path)
- */
- @Override
- public String relativeToAbsolutePath(String location, Path curDir)
- throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Thu Nov 19 00:58:17 2009
@@ -18,23 +18,23 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
+import java.util.Random;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
/**
- * A loader that samples the data. This loader can subsume loader that
- * can handle starting in the middle of a record. Loaders that can
- * handle this should implement the SamplableLoader interface.
+ * A loader that samples the data.
+ * It randomly samples tuples from input. The number of tuples to be sampled
+ * has to be set before the first call to getNext().
+ * see documentation of getNext() call.
*/
-//XXX : FIXME - make this work with new load-store redesign
public class RandomSampleLoader extends SampleLoader {
+ //array to store the sample tuples
+ Tuple [] samples = null;
+ //index into samples array to the next sample to be returned
+ protected int nextSampleIdx= 0;
+
/**
* Construct with a class of loader to use.
* @param funcSpec func spec of the loader to use.
@@ -49,61 +49,67 @@
// set the number of samples
super.setNumSamples(Integer.valueOf(ns));
}
-
-
- @Override
- public void setNumSamples(int n) {
- // Setting it to 100 as default for order by
- super.setNumSamples(100);
- }
/* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#getInputFormat()
+ * @see org.apache.pig.LoadFunc#getNext()
+ * Allocate a buffer for numSamples elements, populate it with the
+ * first numSamples tuples, and continue scanning rest of the input.
+ * For every ith next() call, we generate a random number r s.t. 0<=r<i,
+ * and if r<numSamples we insert the new tuple into our buffer at position r.
+ * This gives us a random sample of the tuples in the partition.
*/
@Override
- public InputFormat getInputFormat() throws IOException {
- // TODO Auto-generated method stub
- return loader.getInputFormat();
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#getLoadCaster()
- */
- @Override
- public LoadCaster getLoadCaster() {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
- */
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
- // TODO Auto-generated method stub
+ public Tuple getNext() throws IOException {
+
+ if(samples != null){
+ return getSample();
+ }
+ //else collect samples
+ samples = new Tuple[numSamples];
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
- */
- @Override
- public void setLocation(String location, Job job) throws IOException {
- // TODO Auto-generated method stub
+ // populate the samples array with first numSamples tuples
+ Tuple t = null;
+ for(int i=0; i<numSamples; i++){
+ t = loader.getNext();
+ if(t == null)
+ break;
+ samples[i] = t;
+ }
+ // rowNum that starts from 1
+ int rowNum = numSamples+1;
+ Random randGen = new Random();
+
+ if(t != null){ // did not exhaust all tuples
+ while(true){
+ // collect samples until input is exhausted
+ int rand = randGen.nextInt(rowNum);
+ if(rand < numSamples){
+ // pick this as sample
+ Tuple sampleTuple = loader.getNext();
+ if(sampleTuple == null)
+ break;
+ samples[rand] = sampleTuple;
+ }else {
+ //skip tuple
+ if(!skipNext())
+ break;
+ }
+ rowNum++;
+ }
+ }
+
+ return getSample();
+ }
+
+ private Tuple getSample() {
+ if(nextSampleIdx < samples.length){
+ return samples[nextSampleIdx++];
+ }
+ else{
+ return null;
+ }
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path)
- */
- @Override
- public String relativeToAbsolutePath(String location, Path curDir)
- throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java Thu Nov 19 00:58:17 2009
@@ -19,36 +19,32 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Map;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
-import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Pair;
/**
* Abstract class that specifies the interface for sample loaders
*
*/
-//XXX : FIXME - make this work with new load-store redesign
public abstract class SampleLoader implements LoadFunc {
- protected int numSamples;
- protected long skipInterval;
+ // number of samples to be sampled
+ protected int numSamples;
+
protected LoadFunc loader;
- private TupleFactory factory;
- private boolean initialized = false;
-
+
+ // RecordReader used by the underlying loader
+ private RecordReader<?, ?> recordReader= null;
public SampleLoader(String funcSpec) {
loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
@@ -66,22 +62,49 @@
* @see org.apache.pig.LoadFunc#getInputFormat()
*/
@Override
- public InputFormat getInputFormat() throws IOException {
+ public InputFormat<?,?> getInputFormat() throws IOException {
return loader.getInputFormat();
+ }
+
+ public boolean skipNext() throws IOException {
+ try {
+ return recordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ throw new IOException("Error getting input",e);
+ }
+ }
+
+ public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc)
+ throws ExecException {
}
+
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return loader.getLoadCaster();
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir)
+ throws IOException {
+ return loader.relativeToAbsolutePath(location, curDir);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit)
+ */
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+ loader.prepareToRead(reader, split);
+ this.recordReader = reader;
+ }
+
/* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#getNext()
- */
- public Tuple getNext() throws IOException {
- // estimate how many tuples there are in the map
- // based on the
- return null;
- }
-
- public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
- // TODO Auto-generated method stub
-
- }
+ * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ loader.setLocation(location, job);
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java Thu Nov 19 00:58:17 2009
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * UDF to get memory and disk size of a tuple.
- * It is used by skewed join.
- *
- */
-
-public class TupleSize extends EvalFunc<Tuple>{
-
- private TupleFactory factory;
-
- public TupleSize() {
- factory = TupleFactory.getInstance();
- }
-
- /**
- * Get memory size and disk size of input tuple
- */
- public Tuple exec(Tuple in) throws IOException {
- if (in == null) {
- return null;
- }
-
- Tuple t = factory.newTuple(2);
- t.set(0, in.getMemorySize());
- t.set(1, in.get(in.size()-1));
-
- return t;
- }
-
- public Type getReturnType(){
- return Tuple.class;
- }
-}