You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/26 19:05:06 UTC
svn commit: r829883 [2/2] - in /hadoop/pig/branches/load-store-redesign: ./
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executioneng...
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java Mon Oct 26 18:05:05 2009
@@ -17,31 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.util.ObjectSerializer;
/**
* This class is the static Mapper class used by Pig
@@ -62,12 +40,12 @@
public class PigMapOnly {
- public static class Map extends PigMapBase implements
- Mapper<Text, Tuple, PigNullableWritable, Writable> {
+ public static class Map extends PigMapBase {
@Override
- public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
- oc.collect(null, tuple);
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+ oc.write(null, tuple);
}
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Oct 26 18:05:05 2009
@@ -25,42 +25,34 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.PropertyConfigurator;
-
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.WrappedIOException;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.impl.io.NullablePartitionWritable;
/**
* This class is the static Mapper & Reducer classes that
@@ -87,25 +79,29 @@
*/
public class PigMapReduce {
- public static JobConf sJobConf = null;
+ public static JobContext sJobContext = null;
+ public static Configuration sJobConf = null;
private final static Tuple DUMMYTUPLE = null;
- public static class Map extends PigMapBase implements
- Mapper<Text, Tuple, PigNullableWritable, Writable> {
+ public static class Map extends PigMapBase {
@Override
- public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
HDataType.getWritableComparableTypes(tuple.get(1), keyType);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
key.setIndex(index);
- val.setIndex(index);
- oc.collect(key, val);
+ val.setIndex(index);
+
+ oc.write(key, val);
}
}
@@ -115,12 +111,12 @@
* to be handed tuples. Hence this map class ensures that the "key" used
* in the order by is wrapped into a tuple (if it isn't already a tuple)
*/
- public static class MapWithComparator extends PigMapBase implements
- Mapper<Text, Tuple, PigNullableWritable, Writable> {
+ public static class MapWithComparator extends PigMapBase {
@Override
- public void collect(OutputCollector<PigNullableWritable, Writable> oc,
- Tuple tuple) throws ExecException, IOException {
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
Object keyTuple = null;
if(keyType != DataType.TUPLE) {
Object k = tuple.get(1);
@@ -134,110 +130,115 @@
PigNullableWritable key =
HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
key.setIndex(index);
val.setIndex(index);
- oc.collect(key, val);
+
+ oc.write(key, val);
}
}
- /**
- * Used by Skewed Join
- */
- public static class MapWithPartitionIndex extends Map implements
- Mapper<Text, Tuple, PigNullableWritable, Writable> {
+ /**
+ * Used by Skewed Join
+ */
+ public static class MapWithPartitionIndex extends Map {
@Override
- public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
- Byte tupleKeyIdx = 2;
- Byte tupleValIdx = 3;
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
+ Byte tupleKeyIdx = 2;
+ Byte tupleValIdx = 3;
Byte index = (Byte)tuple.get(0);
- Byte partitionIndex = -1;
- // for partitioning table, the partition index isn't present
- if (tuple.size() == 3) {
- //super.collect(oc, tuple);
- //return;
- tupleKeyIdx--;
- tupleValIdx--;
- } else {
- partitionIndex = (Byte)tuple.get(1);
- }
+ Byte partitionIndex = -1;
+ // for partitioning table, the partition index isn't present
+ if (tuple.size() == 3) {
+ //super.collect(oc, tuple);
+ //return;
+ tupleKeyIdx--;
+ tupleValIdx--;
+ } else {
+ partitionIndex = (Byte)tuple.get(1);
+ }
PigNullableWritable key =
HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), DataType.TUPLE);
- NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
- //key.setIndex(index);
- //NullableTuple wrappedKey = new NullableTuple((Tuple)tuple);
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
- NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+ NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
wrappedKey.setIndex(index);
- // set the partition
- wrappedKey.setPartition(partitionIndex);
- val.setIndex(index);
- oc.collect(wrappedKey, val);
- //oc.collect(key, val);
- }
-
- @Override
- protected void runPipeline(PhysicalOperator leaf) throws IOException, ExecException {
- while(true){
- Result res = leaf.getNext(DUMMYTUPLE);
-
- if(res.returnStatus==POStatus.STATUS_OK){
- // For POPartitionRearrange, the result is a bag. This operator is used for
- // skewed join
- if (res.result instanceof DataBag) {
- Iterator<Tuple> its = ((DataBag)res.result).iterator();
- while(its.hasNext()) {
- collect(outputCollector, its.next());
- }
- }else{
- collect(outputCollector, (Tuple)res.result);
- }
- continue;
- }
-
- if(res.returnStatus==POStatus.STATUS_EOP) {
- return;
- }
-
- if(res.returnStatus==POStatus.STATUS_NULL)
- continue;
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
- // close() we can do the right thing
- errorInMap = true;
- // if there is an errmessage use it
- String errMsg;
- if(res.result != null) {
- errMsg = "Received Error while " +
- "processing the map plan: " + res.result;
- } else {
- errMsg = "Received Error while " +
- "processing the map plan.";
- }
-
- int errCode = 2055;
- ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
- throw ee;
- }
- }
- }
- }
-
- public static class Reduce extends MapReduceBase
- implements
- Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+
+ // set the partition
+ wrappedKey.setPartition(partitionIndex);
+ val.setIndex(index);
+
+ oc.write(wrappedKey, val);
+ }
+
+ @Override
+ protected void runPipeline(PhysicalOperator leaf)
+ throws IOException, InterruptedException {
+
+ while(true){
+ Result res = leaf.getNext(DUMMYTUPLE);
+
+ if(res.returnStatus==POStatus.STATUS_OK){
+ // For POPartitionRearrange, the result is a bag.
+ // This operator is used for skewed join
+ if (res.result instanceof DataBag) {
+ Iterator<Tuple> its = ((DataBag)res.result).iterator();
+ while(its.hasNext()) {
+ collect(outputCollector, its.next());
+ }
+ }else{
+ collect(outputCollector, (Tuple)res.result);
+ }
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInMap = true;
+ // if there is an errmessage use it
+ String errMsg;
+ if(res.result != null) {
+ errMsg = "Received Error while " +
+ "processing the map plan: " + res.result;
+ } else {
+ errMsg = "Received Error while " +
+ "processing the map plan.";
+ }
+
+ int errCode = 2055;
+ throw new ExecException(errMsg, errCode, PigException.BUG);
+ }
+ }
+ }
+ }
+
+ public static class Reduce
+ extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+
protected final Log log = LogFactory.getLog(getClass());
//The reduce plan
@@ -255,7 +256,7 @@
ProgressableReporter pigReporter;
- protected OutputCollector<PigNullableWritable, Writable> outputCollector;
+ protected Context outputCollector;
protected boolean errorInReduce = false;
@@ -272,10 +273,13 @@
*/
@SuppressWarnings("unchecked")
@Override
- public void configure(JobConf jConf) {
- super.configure(jConf);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration jConf = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
- sJobConf = jConf;
+ sJobContext = context;
+ sJobConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
@@ -297,8 +301,6 @@
}
// till here
- long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000);
-
pigReporter = new ProgressableReporter();
if(!(rp.isEmpty())) {
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
@@ -315,37 +317,36 @@
* into key, Bag<Tuple> after converting Hadoop type key into Pig type.
* The package result is either collected as is, if the reduce plan is
* empty or after passing through the reduce plan.
- */
- public void reduce(PigNullableWritable key,
- Iterator<NullableTuple> tupIter,
- OutputCollector<PigNullableWritable, Writable> oc,
- Reporter reporter) throws IOException {
+ */
+ @Override
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
if (!initialized) {
initialized = true;
// cache the collector for use in runPipeline()
// which could additionally be called from close()
- this.outputCollector = oc;
- pigReporter.setRep(reporter);
+ this.outputCollector = context;
+ pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(reporter);
+ pigHadoopLogger.setReporter(context);
+
PhysicalOperator.setPigLogger(pigHadoopLogger);
for (POStore store: stores) {
MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
- impl.setReporter(reporter);
+ = new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
}
-
+
// In the case we optimize the join, we combine
// POPackage and POForeach - so we could get many
// tuples out of the getnext() call of POJoinPackage
@@ -353,72 +354,76 @@
// POJoinPacakage.getNext()
if (pack instanceof POJoinPackage)
{
- pack.attachInput(key, tupIter);
+ pack.attachInput(key, tupIter.iterator());
while (true)
{
- if (processOnePackageOutput(oc))
+ if (processOnePackageOutput(context))
break;
}
}
else {
// join is not optimized, so package will
// give only one tuple out for the key
- pack.attachInput(key, tupIter);
- processOnePackageOutput(oc);
- }
+ pack.attachInput(key, tupIter.iterator());
+ processOnePackageOutput(context);
+ }
}
// return: false-more output
// true- end of processing
- public boolean processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc) throws IOException
- {
- try {
- Result res = pack.getNext(DUMMYTUPLE);
- if(res.returnStatus==POStatus.STATUS_OK){
- Tuple packRes = (Tuple)res.result;
-
- if(rp.isEmpty()){
- oc.collect(null, packRes);
- return false;
- }
- for (int i = 0; i < roots.length; i++) {
- roots[i].attachInput(packRes);
- }
- runPipeline(leaf);
-
- }
+ public boolean processOnePackageOutput(Context oc)
+ throws IOException, InterruptedException {
+
+ Result res = pack.getNext(DUMMYTUPLE);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
- if(res.returnStatus==POStatus.STATUS_NULL) {
+ if(rp.isEmpty()){
+ oc.write(null, packRes);
return false;
}
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- int errCode = 2093;
- String msg = "Encountered error in package operator while processing group.";
- throw new ExecException(msg, errCode, PigException.BUG);
+ for (int i = 0; i < roots.length; i++) {
+ roots[i].attachInput(packRes);
}
+ runPipeline(leaf);
- if(res.returnStatus==POStatus.STATUS_EOP) {
- return true;
- }
-
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
return false;
- } catch (ExecException e) {
- throw e;
}
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return true;
+ }
+
+ return false;
+
}
/**
* @param leaf
- * @throws ExecException
+ * @throws InterruptedException
* @throws IOException
*/
- protected void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+ protected void runPipeline(PhysicalOperator leaf)
+ throws InterruptedException, IOException {
+
while(true)
{
Result redRes = leaf.getNext(DUMMYTUPLE);
if(redRes.returnStatus==POStatus.STATUS_OK){
- outputCollector.collect(null, (Tuple)redRes.result);
+ try{
+ outputCollector.write(null, (Tuple)redRes.result);
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
continue;
}
@@ -426,8 +431,9 @@
return;
}
- if(redRes.returnStatus==POStatus.STATUS_NULL)
+ if(redRes.returnStatus==POStatus.STATUS_NULL) {
continue;
+ }
if(redRes.returnStatus==POStatus.STATUS_ERR){
// remember that we had an issue so that in
@@ -446,17 +452,15 @@
throw new ExecException(msg, errCode, PigException.BUG);
}
}
-
-
}
/**
* Will be called once all the intermediate keys and values are
* processed. So right place to stop the reporter thread.
*/
- @Override
- public void close() throws IOException {
- super.close();
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ super.cleanup(context);
if(errorInReduce) {
// there was an error in reduce - just return
@@ -471,17 +475,13 @@
// This will result in nothing happening in the case
// where there is no stream in the pipeline
rp.endOfAllInput = true;
- try {
- runPipeline(leaf);
- } catch (ExecException e) {
- throw e;
- }
+ runPipeline(leaf);
}
for (POStore store: stores) {
if (!initialized) {
MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ = new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
@@ -519,8 +519,8 @@
* and the reporter thread
*/
@Override
- public void configure(JobConf jConf) {
- super.configure(jConf);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
keyType = pack.getKeyType();
}
@@ -530,31 +530,29 @@
* The package result is either collected as is, if the reduce plan is
* empty or after passing through the reduce plan.
*/
- public void reduce(PigNullableWritable key,
- Iterator<NullableTuple> tupIter,
- OutputCollector<PigNullableWritable, Writable> oc,
- Reporter reporter) throws IOException {
+ @Override
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
if (!initialized) {
initialized = true;
// cache the collector for use in runPipeline()
// which could additionally be called from close()
- this.outputCollector = oc;
- pigReporter.setRep(reporter);
+ this.outputCollector = context;
+ pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(reporter);
+ pigHadoopLogger.setReporter(context);
PhysicalOperator.setPigLogger(pigHadoopLogger);
for (POStore store: stores) {
MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
- impl.setReporter(reporter);
+ = new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
@@ -574,43 +572,38 @@
}
}
- pack.attachInput(key, tupIter);
+ pack.attachInput(key, tupIter.iterator());
- try {
- Result res = pack.getNext(DUMMYTUPLE);
- if(res.returnStatus==POStatus.STATUS_OK){
- Tuple packRes = (Tuple)res.result;
-
- if(rp.isEmpty()){
- oc.collect(null, packRes);
- return;
- }
-
- rp.attachInput(packRes);
-
- List<PhysicalOperator> leaves = rp.getLeaves();
-
- PhysicalOperator leaf = leaves.get(0);
- runPipeline(leaf);
-
- }
+ Result res = pack.getNext(DUMMYTUPLE);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
- if(res.returnStatus==POStatus.STATUS_NULL) {
+ if(rp.isEmpty()){
+ context.write(null, packRes);
return;
}
- if(res.returnStatus==POStatus.STATUS_ERR){
- int errCode = 2093;
- String msg = "Encountered error in package operator while processing group.";
- throw new ExecException(msg, errCode, PigException.BUG);
- }
-
+ rp.attachInput(packRes);
+
+ List<PhysicalOperator> leaves = rp.getLeaves();
+
+ PhysicalOperator leaf = leaves.get(0);
+ runPipeline(leaf);
- } catch (ExecException e) {
- throw e;
}
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
}
}
-
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Oct 26 18:05:05 2009
@@ -19,16 +19,23 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.text.NumberFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.pig.PigException;
import org.apache.pig.StoreFunc;
@@ -37,6 +44,7 @@
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tools.bzip2r.CBZip2OutputStream;
/**
@@ -45,16 +53,31 @@
* image of PigInputFormat having RecordWriter instead
* of a RecordReader.
*/
-public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
- public static final String PIG_OUTPUT_FUNC = "pig.output.func";
+@SuppressWarnings("unchecked")
+public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
+ /** hadoop job output directory */
+ public static final String MAPRED_OUTPUT_DIR = "mapred.output.dir";
+ /** hadoop partition number */
+ public static final String MAPRED_TASK_PARTITION = "mapred.task.partition";
+
+ /** the temporary directory for the multi store */
+ public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
+ /** the relative path that can be used to build a temporary
+ * place to store the output from a number of map-reduce tasks*/
+ public static final String PIG_TMP_PATH = "pig.tmp.path";
+
+ private FileOutputCommitter committer = null;
+
+ private final Log log = LogFactory.getLog(getClass());
+
/**
* In general, the mechanism for an OutputFormat in Pig to get hold of the storeFunc
* and the metadata information (for now schema and location provided for the store in
* the pig script) is through the following Utility static methods:
- * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(JobConf)}
+ * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(Configuration)}
* - this will get the {@link org.apache.pig.StoreFunc} reference to use in the RecordWriter.write()
- * {@link MapRedUtil#getStoreConfig(JobConf)} - this will get the {@link org.apache.pig.StoreConfig}
+ * {@link MapRedUtil#getStoreConfig(Configuration)} - this will get the {@link org.apache.pig.StoreConfig}
* reference which has metadata like the location (the string supplied with store statement in the script)
* and the {@link org.apache.pig.impl.logicalLayer.schema.Schema} of the data. The OutputFormat
* should NOT use the location in the StoreConfig to write the output if the location represents a
@@ -66,18 +89,12 @@
* which will provide a safe output directory into which the OutputFormat should write
* the part file (given by the name argument in the getRecordWriter() call).
*/
- public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
- String name, Progressable progress) throws IOException {
- Path outputDir = FileOutputFormat.getWorkOutputPath(job);
- return getRecordWriter(fs, job, outputDir, name, progress);
- }
-
- public PigRecordWriter getRecordWriter(FileSystem fs, JobConf job,
- Path outputDir, String name, Progressable progress)
- throws IOException {
- StoreFunc store = MapRedUtil.getStoreFunc(job);
-
- String parentName = FileOutputFormat.getOutputPath(job).getName();
+ private PigRecordWriter getRecordWriter(FileSystem fs, TaskAttemptContext context,
+ Path outputDir, String name) throws IOException {
+
+ StoreFunc store = MapRedUtil.getStoreFunc(context.getConfiguration());
+
+ String parentName = FileOutputFormat.getOutputPath(context).getName();
int suffixStart = parentName.lastIndexOf('.');
if (suffixStart != -1) {
String suffix = parentName.substring(suffixStart);
@@ -85,22 +102,20 @@
name = name + suffix;
}
}
- return new PigRecordWriter(fs, new Path(outputDir, name), store);
- }
- public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
- // TODO We really should validate things here
- return;
+ return new PigRecordWriter(fs, new Path(outputDir, name), store);
}
- static public class PigRecordWriter implements
- RecordWriter<WritableComparable, Tuple> {
+ @SuppressWarnings("unchecked")
+ static public class PigRecordWriter
+ extends RecordWriter<WritableComparable, Tuple> {
+
private OutputStream os = null;
private StoreFunc sfunc = null;
public PigRecordWriter(FileSystem fs, Path file, StoreFunc sfunc)
- throws IOException {
+ throws IOException {
this.sfunc = sfunc;
fs.delete(file, true);
this.os = fs.create(file);
@@ -115,17 +130,66 @@
* We only care about the values, so we are going to skip the keys when
* we write.
*
- * @see org.apache.hadoop.mapred.RecordWriter#write(Object, Object)
+ * @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
*/
+ @Override
public void write(WritableComparable key, Tuple value)
- throws IOException {
+ throws IOException, InterruptedException{
this.sfunc.putNext(value);
}
- public void close(Reporter reporter) throws IOException {
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
sfunc.finish();
- os.close();
+ os.close();
}
}
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (committer == null) {
+ Path output = null;
+ if (context.getConfiguration().get(PIG_MAPRED_OUTPUT_DIR) != null) {
+ output = new Path(context.getConfiguration().get(PIG_MAPRED_OUTPUT_DIR));
+ } else {
+ output = new Path(context.getConfiguration().get(MAPRED_OUTPUT_DIR));
+ }
+ committer = new FileOutputCommitter(output, context);
+ }
+ return committer;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RecordWriter getRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ FileOutputCommitter committer = (FileOutputCommitter)getOutputCommitter(context);
+ Path outputDir = committer.getWorkPath();
+ Configuration conf = context.getConfiguration();
+ String tmpPath = conf.get(PIG_TMP_PATH);
+ if (tmpPath != null) {
+ outputDir = new Path(committer.getWorkPath(), tmpPath);
+ }
+ return getRecordWriter(FileSystem.get(conf), context, outputDir, getPartName(conf));
+ }
+
+ private String getPartName(Configuration conf) {
+ int partition = conf.getInt(MAPRED_TASK_PARTITION, -1);
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+
+ return "part-" + numberFormat.format(partition);
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java Mon Oct 26 18:05:05 2009
@@ -17,17 +17,17 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
public class ProgressableReporter implements PigProgressable {
- Reporter rep;
-
+ Progressable rep;
+
public ProgressableReporter(){
}
-
- public ProgressableReporter(Reporter rep) {
+
+ public ProgressableReporter(Progressable rep) {
super();
this.rep = rep;
}
@@ -38,10 +38,10 @@
}
public void progress(String msg) {
- rep.setStatus(msg);
+
}
- public void setRep(Reporter rep) {
+ public void setRep(Progressable rep) {
this.rep = rep;
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Mon Oct 26 18:05:05 2009
@@ -28,17 +28,19 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Set;
import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.Slice;
@@ -47,7 +49,6 @@
import org.apache.pig.backend.executionengine.PigSlice;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -57,14 +58,20 @@
/**
* Wraps a {@link Slice} in an {@link InputSplit} so it's usable by hadoop.
*/
-public class SliceWrapper implements InputSplit {
+public class SliceWrapper extends InputSplit implements Writable {
private int index;
private ExecType execType;
private Slice wrapped;
private transient FileSystem fs;// transient so it isn't serialized
- private transient JobConf lastConf;
+ private transient Configuration lastConf;
private ArrayList<OperatorKey> targetOps;
+
+ // XXX hadoop 20 new API integration: get around a hadoop 20 bug
+ // by setting total number of splits to each split so that it can
+ // be passed to the back-end. This value is needed
+ // by PoissonSampleLoader to compute the number of samples
+ private int totalSplits;
public SliceWrapper() {
// for deserialization
@@ -77,22 +84,32 @@
this.fs = fs;
this.targetOps = targetOps;
}
-
+
public int getIndex() {
return index;
}
+ public void setTotalSplits(int t) {
+ totalSplits = t;
+ }
+
+ public int getTotalSplits() {
+ return totalSplits;
+ }
+
+ @Override
public long getLength() throws IOException {
return wrapped.getLength();
}
+ @Override
public String[] getLocations() throws IOException {
if(wrapped instanceof PigSlice) {
Set<String> locations = new HashSet<String>();
for (String loc : wrapped.getLocations()) {
Path path = new Path(loc);
FileStatus status = fs.getFileStatus(path);
- BlockLocation[] b = fs.getFileBlockLocations(status, wrapped.getStart(), wrapped.getLength());
+ BlockLocation[] b = fs.getFileBlockLocations(status, wrapped.getStart(), wrapped.getLength());
int total = 0;
for (int i = 0; i < b.length; i++) {
total += b[i].getHosts().length;
@@ -108,76 +125,84 @@
return locations.toArray(new String[locations.size()]);
} else {
return wrapped.getLocations();
- }
-
-
+ }
}
- public JobConf getJobConf() {
+ public Configuration getJobConf() {
return lastConf;
}
+
@SuppressWarnings("unchecked")
- public RecordReader<Text, Tuple> makeReader(JobConf job) throws IOException {
- lastConf = job;
- DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
+ public RecordReader<Text, Tuple> makeReader(Configuration conf) throws IOException {
+ lastConf = conf;
+ DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(conf));
+
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
if(execType == ExecType.MAPREDUCE)
- store.setActiveContainer(store.asContainer("/user/" + job.getUser()));
- PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
- wrapped.init(store);
+ store.setActiveContainer(store.asContainer("/user/" + conf.get("user.name")));
+ PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
+ wrapped.init(store);
- job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
// Mimic org.apache.hadoop.mapred.FileSplit if feasible...
String[] locations = wrapped.getLocations();
if (locations.length > 0) {
- job.set("map.input.file", locations[0]);
- job.setLong("map.input.start", wrapped.getStart());
- job.setLong("map.input.length", wrapped.getLength());
+ conf.set("map.input.file", locations[0]);
+ conf.setLong("map.input.start", wrapped.getStart());
+ conf.setLong("map.input.length", wrapped.getLength());
}
- return new RecordReader<Text, Tuple>() {
-
- TupleFactory tupFac = TupleFactory.getInstance();
- public void close() throws IOException {
- wrapped.close();
- }
+ return new TupleReader();
+ }
+
+ public class TupleReader extends RecordReader<Text, Tuple> {
+ private Tuple current;
- public Text createKey() {
- return null; // we never use the key!
- }
+ TupleFactory tupFac = TupleFactory.getInstance();
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return wrapped.getProgress();
+ }
- public Tuple createValue() {
- return tupFac.newTuple();
- }
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
- public long getPos() throws IOException {
- return wrapped.getPos();
- }
+ @Override
+ public Tuple getCurrentValue() throws IOException, InterruptedException {
+ return current;
+ }
- public float getProgress() throws IOException {
- return wrapped.getProgress();
- }
+ @Override
+ public void initialize(InputSplit inputsplit,
+ TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException {
+ }
- public boolean next(Text key, Tuple value) throws IOException {
- return wrapped.next(value);
- }
- };
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ Tuple t = tupFac.newTuple();
+ boolean result = wrapped.next(t);
+ current = t;
+ return result;
+ }
}
@SuppressWarnings("unchecked")
+ @Override
public void readFields(DataInput is) throws IOException {
execType = (ExecType) readObject(is);
targetOps = (ArrayList<OperatorKey>) readObject(is);
index = is.readInt();
wrapped = (Slice) readObject(is);
- }
-
- private IOException wrapException(Exception e) {
- IOException newE = new IOException(e.getMessage());
- newE.initCause(e);
- return newE;
+ totalSplits = is.readInt();
}
private Object readObject(DataInput is) throws IOException {
@@ -194,11 +219,13 @@
}
}
+ @Override
public void write(DataOutput os) throws IOException {
writeObject(execType, os);
writeObject(targetOps, os);
os.writeInt(index);
writeObject(wrapped, os);
+ os.writeInt(totalSplits);
}
private void writeObject(Serializable obj, DataOutput os)
@@ -218,4 +245,7 @@
return wrapped;
}
+ public List<OperatorKey> getTargetOperatorKeyList() {
+ return targetOps;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon Oct 26 18:05:05 2009
@@ -18,34 +18,16 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.hadoop.io.RawComparator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.NullableBytesWritable;
-import org.apache.pig.impl.io.NullableDoubleWritable;
-import org.apache.pig.impl.io.NullableFloatWritable;
-import org.apache.pig.impl.io.NullableIntWritable;
-import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.NullablePartitionWritable;
@@ -62,71 +44,79 @@
* For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
* in a round robin manner.
*/
-public class SkewedPartitioner implements Partitioner<PigNullableWritable, Writable> {
- Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
- static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
- Integer totalReducers;
+public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
+ Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
+ static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
+ Integer totalReducers;
+ Configuration conf;
+ @Override
public int getPartition(PigNullableWritable wrappedKey, Writable value,
int numPartitions) {
- // for streaming tables, return the partition index blindly
- if (wrappedKey instanceof NullablePartitionWritable && ((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
- return (int) ((NullablePartitionWritable)wrappedKey).getPartition();
- }
-
- // for partition table, compute the index based on the sampler output
- Pair <Integer, Integer> indexes;
- Integer curIndex = -1;
- Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
-
- // extract the key from nullablepartitionwritable
- PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
-
- try {
- keyTuple.set(0, key.getValueAsPigType());
- } catch (ExecException e) {
- return -1;
- }
-
- // if the key is not null and key
- if (key instanceof NullableTuple && key.getValueAsPigType() != null) {
- keyTuple = (Tuple)key.getValueAsPigType();
- }
-
- indexes = reducerMap.get(keyTuple);
- // if the reducerMap does not contain the key, do the default hash based partitioning
- if (indexes == null) {
- return (Math.abs(keyTuple.hashCode()) % totalReducers);
- }
-
- if (currentIndexMap.containsKey(keyTuple)) {
- curIndex = currentIndexMap.get(keyTuple);
- }
-
- if (curIndex >= (indexes.first + indexes.second) || curIndex == -1) {
- curIndex = indexes.first;
- } else {
- curIndex++;
- }
-
- // set it in the map
- currentIndexMap.put(keyTuple, curIndex);
- return (curIndex % totalReducers);
- }
-
- @SuppressWarnings("unchecked")
- public void configure(JobConf job) {
+ // for streaming tables, return the partition index blindly
+ if (wrappedKey instanceof NullablePartitionWritable && ((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+ return (int) ((NullablePartitionWritable)wrappedKey).getPartition();
+ }
+
+ // for partition table, compute the index based on the sampler output
+ Pair <Integer, Integer> indexes;
+ Integer curIndex = -1;
+ Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
+
+ // extract the key from nullablepartitionwritable
+ PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
+
+ try {
+ keyTuple.set(0, key.getValueAsPigType());
+ } catch (ExecException e) {
+ return -1;
+ }
+
+ // if the key is not null and key
+ if (key instanceof NullableTuple && key.getValueAsPigType() != null) {
+ keyTuple = (Tuple)key.getValueAsPigType();
+ }
+
+ indexes = reducerMap.get(keyTuple);
+ // if the reducerMap does not contain the key, do the default hash based partitioning
+ if (indexes == null) {
+ return (Math.abs(keyTuple.hashCode()) % totalReducers);
+ }
+
+ if (currentIndexMap.containsKey(keyTuple)) {
+ curIndex = currentIndexMap.get(keyTuple);
+ }
+
+ if (curIndex >= (indexes.first + indexes.second) || curIndex == -1) {
+ curIndex = indexes.first;
+ } else {
+ curIndex++;
+ }
+
+ // set it in the map
+ currentIndexMap.put(keyTuple, curIndex);
+ return (curIndex % totalReducers);
+ }
+
+ @Override
+ public void setConf(Configuration job) {
+ conf = job;
String keyDistFile = job.get("pig.keyDistFile", "");
if (keyDistFile.length() == 0)
throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
- try {
- Integer [] redCnt = new Integer[1];
- reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt, job, DataType.TUPLE);
- totalReducers = redCnt[0];
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ try {
+ Integer [] redCnt = new Integer[1];
+ reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt, job, DataType.TUPLE);
+ totalReducers = redCnt[0];
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Oct 26 18:05:05 2009
@@ -17,27 +17,26 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
@@ -53,14 +52,23 @@
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
-public class WeightedRangePartitioner implements Partitioner<PigNullableWritable, Writable> {
+public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
+ implements Configurable {
PigNullableWritable[] quantiles;
RawComparator<PigNullableWritable> comparator;
- public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
- JobConf job;
+ public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
+ = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+
+ Configuration job;
+ @SuppressWarnings("unchecked")
+ @Override
public int getPartition(PigNullableWritable key, Writable value,
int numPartitions){
+ if (comparator == null) {
+ comparator = (RawComparator<PigNullableWritable>)PigMapReduce.sJobContext.getSortComparator();
+ }
+
if(!weightedParts.containsKey(key)){
int index = Arrays.binarySearch(quantiles, key, comparator);
if (index < 0)
@@ -74,10 +82,12 @@
}
@SuppressWarnings("unchecked")
- public void configure(JobConf job) {
- this.job = job;
+ @Override
+ public void setConf(Configuration configuration) {
+ job = configuration;
+
String quantilesFile = job.get("pig.quantilesFile", "");
- comparator = job.getOutputKeyComparator();
+
if (quantilesFile.length() == 0)
throw new RuntimeException(this.getClass().getSimpleName() + " used but no quantiles found");
@@ -154,10 +164,6 @@
}
}
- private boolean areEqual(PigNullableWritable sample, PigNullableWritable writable) {
- return comparator.compare(sample, writable)==0;
- }
-
private void convertToArray(
DataBag quantilesListAsBag) {
ArrayList<PigNullableWritable> quantilesList = getList(quantilesListAsBag);
@@ -193,4 +199,11 @@
}
return list;
}
+
+ @Override
+ public Configuration getConf() {
+ return job;
+ }
+
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Oct 26 18:05:05 2009
@@ -4,37 +4,31 @@
package org.apache.pig.backend.hadoop.executionengine.util;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Progressable;
import org.apache.pig.PigException;
import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTupleFactory;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.data.Tuple;
-import java.io.InputStream;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
/**
* A class of utility static methods to be used in the hadoop map reduce backend
@@ -51,7 +45,7 @@
* @return the StoreFunc reference
* @throws ExecException
*/
- public static StoreFunc getStoreFunc(JobConf conf) throws ExecException {
+ public static StoreFunc getStoreFunc(Configuration conf) throws ExecException {
StoreFunc store;
try {
String storeFunc = conf.get("pig.storeFunc", "");
@@ -82,80 +76,80 @@
* an OutputFormat to write the data
* @throws IOException
*/
- public static StoreConfig getStoreConfig(JobConf conf) throws IOException {
+ public static StoreConfig getStoreConfig(Configuration conf) throws IOException {
return (StoreConfig) ObjectSerializer.deserialize(conf.get(JobControlCompiler.PIG_STORE_CONFIG));
}
- /**
- * Loads the key distribution sampler file
+ /**
+ * Loads the key distribution sampler file
*
* @param keyDistFile the name for the distribution file
* @param totalReducers gets set to the total number of reducers as found in the dist file
* @param job Ref to a jobCong object
* @param keyType Type of the key to be stored in the return map. It currently treats Tuple as a special case.
- */
- @SuppressWarnings("unchecked")
- public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String keyDistFile,
- Integer[] totalReducers, JobConf job, byte keyType) throws IOException {
-
- Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
-
- InputStream is;
- if (job != null) {
- is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
- } else {
- is = FileLocalizer.openDFSFile(keyDistFile);
- }
- BinStorage loader = new BinStorage();
- DataBag partitionList;
- loader.bindTo(keyDistFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
- Tuple t = loader.getNext();
- if(t==null) {
- throw new RuntimeException("Empty samples file");
- }
- // The keydist file is structured as (key, min, max)
- // min, max being the index of the reducers
- Map<String, Object > distMap = (Map<String, Object>) t.get (0);
- partitionList = (DataBag) distMap.get("partition.list");
- totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
- Iterator<Tuple> it = partitionList.iterator();
- while (it.hasNext()) {
- Tuple idxTuple = it.next();
- Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
- Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
- // Used to replace the maxIndex with the number of reducers
- if (maxIndex < minIndex) {
- maxIndex = totalReducers[0] + maxIndex;
- }
- E keyT;
-
- // if the join is on more than 1 key
- if (idxTuple.size() > 3) {
- // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
- // it in the reducer map
- Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
- for (int i=0; i < idxTuple.size() - 2; i++) {
- keyTuple.append(idxTuple.get(i));
- }
- keyT = (E) keyTuple;
- } else {
- if (keyType == DataType.TUPLE) {
- keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
- ((Tuple)keyT).set(0,idxTuple.get(0));
- } else {
- keyT = (E) idxTuple.get(0);
- }
- }
- // number of reducers
- Integer cnt = 0;
- if (minIndex < maxIndex) {
- cnt = maxIndex - minIndex;
- } else {
- cnt = totalReducers[0] + maxIndex - minIndex;
- }
-
- reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
- }
- return reducerMap;
- }
+ */
+ @SuppressWarnings("unchecked")
+ public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String keyDistFile,
+ Integer[] totalReducers, Configuration job, byte keyType) throws IOException {
+
+ Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
+
+ InputStream is;
+ if (job != null) {
+ is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
+ } else {
+ is = FileLocalizer.openDFSFile(keyDistFile);
+ }
+ BinStorage loader = new BinStorage();
+ DataBag partitionList;
+ loader.bindTo(keyDistFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+ Tuple t = loader.getNext();
+ if(t==null) {
+ throw new RuntimeException("Empty samples file");
+ }
+ // The keydist file is structured as (key, min, max)
+ // min, max being the index of the reducers
+ Map<String, Object > distMap = (Map<String, Object>) t.get (0);
+ partitionList = (DataBag) distMap.get("partition.list");
+ totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers[0] + maxIndex;
+ }
+ E keyT;
+
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+ // it in the reducer map
+ Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
+ }
+ keyT = (E) keyTuple;
+ } else {
+ if (keyType == DataType.TUPLE) {
+ keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
+ ((Tuple)keyT).set(0,idxTuple.get(0));
+ } else {
+ keyT = (E) idxTuple.get(0);
+ }
+ }
+ // number of reducers
+ Integer cnt = 0;
+ if (minIndex < maxIndex) {
+ cnt = maxIndex - minIndex;
+ } else {
+ cnt = totalReducers[0] + maxIndex - minIndex;
+ }
+
+ reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
+ }
+ return reducerMap;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Mon Oct 26 18:05:05 2009
@@ -23,12 +23,12 @@
import java.util.Date;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -57,7 +57,7 @@
return "part-" + NUMBER_FORMAT.format(partition);
}
- JobConf job;
+ Configuration job;
String scriptOutputDir;
String scriptLogDir;
@@ -168,10 +168,6 @@
*/
private boolean writeErrorToHDFS(int limit, String taskId) {
if (command.getPersistStderr()) {
- // These are hard-coded begin/end offsets a Hadoop *taskid*
- int beginIndex = 25, endIndex = 31;
-
- //int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
return tipId < command.getLogFilesLimit();
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=829883&r1=829882&r2=829883&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Mon Oct 26 18:05:05 2009
@@ -26,16 +26,15 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import java.util.Stack;
-import java.util.Properties ;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -43,13 +42,13 @@
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.datastorage.SeekableInputStream;
import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.WrappedIOException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
public class FileLocalizer {
private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -155,7 +154,7 @@
public static InputStream openDFSFile(String fileName) throws IOException {
SliceWrapper wrapper = PigInputFormat.getActiveSplit();
- JobConf conf = null;
+ Configuration conf = null;
if (wrapper == null) {
conf = PigMapReduce.sJobConf;
}else{
@@ -180,7 +179,7 @@
public static long getSize(String fileName) throws IOException {
SliceWrapper wrapper = PigInputFormat.getActiveSplit();
- JobConf conf = null;
+ Configuration conf = null;
if (wrapper == null) {
conf = PigMapReduce.sJobConf;
}else{
@@ -456,12 +455,6 @@
initialized = true;
relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt());
toDelete.push(relativeRoot);
- // Runtime.getRuntime().addShutdownHook(new Thread() {
- // @Override
- // public void run() {
- // deleteTempFiles();
- // }
- //});
}
}
@@ -593,7 +586,6 @@
}
}
catch (DataStorageException e) {
- //throw WrappedIOException.wrap("Unable to get collect for pattern " + elem.toString(), e);
throw e;
}
}