You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/19 03:01:59 UTC
svn commit: r1148117 [2/3] - in /pig/trunk: ./ shims/ shims/src/
shims/src/hadoop20/ shims/src/hadoop20/org/ shims/src/hadoop20/org/apache/
shims/src/hadoop20/org/apache/pig/ shims/src/hadoop20/org/apache/pig/backend/
shims/src/hadoop20/org/apache/pig/...
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * This class is the static Mapper & Reducer classes that
+ * are used by Pig to execute Pig Map Reduce jobs. Since
+ * there is a reduce phase, the leaf is bound to be a
+ * POLocalRearrange. So the map phase has to separate the
+ * key and tuple and collect it into the output
+ * collector.
+ *
+ * The shuffle and sort phase sorts these keys & tuples
+ * and creates key, List<Tuple> and passes the key and
+ * iterator to the list. The deserialized POPackage operator
+ * is used to package the key, List<Tuple> into pigKey,
+ * Bag<Tuple> where pigKey is of the appropriate pig type and
+ * then the result of the package is attached to the reduce
+ * plan which is executed if its not empty. Either the result
+ * of the reduce plan or the package res is collected into
+ * the output collector.
+ *
+ * The index of the tuple (that is, which bag it should be placed in by the
+ * package) is packed into the key. This is done so that hadoop sorts the
+ * keys in order of index for join.
+ *
+ * This class is the base class for PigMapReduce, which has slightly
+ * difference among different versions of hadoop. PigMapReduce implementation
+ * is located in $PIG_HOME/shims.
+ */
+public class PigGenericMapReduce {
+
+ public static JobContext sJobContext = null;
+
+ /**
+ * @deprecated Use {@link UDFContext} instead in the following way to get
+ * the job's {@link Configuration}:
+ * <pre>UdfContext.getUdfContext().getJobConf()</pre>
+ */
+ @Deprecated
+ public static Configuration sJobConf = null;
+
+ public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
+ private final static Tuple DUMMYTUPLE = null;
+
+ public static class Map extends PigMapBase {
+
+ @Override
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
+ Byte index = (Byte)tuple.get(0);
+ PigNullableWritable key =
+ HDataType.getWritableComparableTypes(tuple.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
+ // Both the key and the value need the index. The key needs it so
+ // that it can be sorted on the index in addition to the key
+ // value. The value needs it so that POPackage can properly
+ // assign the tuple to its slot in the projection.
+ key.setIndex(index);
+ val.setIndex(index);
+
+ oc.write(key, val);
+ }
+ }
+
+ /**
+ * This "specialized" map class is ONLY to be used in pig queries with
+ * order by a udf. A UDF used for comparison in the order by expects
+ * to be handed tuples. Hence this map class ensures that the "key" used
+ * in the order by is wrapped into a tuple (if it isn't already a tuple)
+ */
+ public static class MapWithComparator extends PigMapBase {
+
+ @Override
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
+ Object keyTuple = null;
+ if(keyType != DataType.TUPLE) {
+ Object k = tuple.get(1);
+ keyTuple = tf.newTuple(k);
+ } else {
+ keyTuple = tuple.get(1);
+ }
+
+
+ Byte index = (Byte)tuple.get(0);
+ PigNullableWritable key =
+ HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
+ NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
+ // Both the key and the value need the index. The key needs it so
+ // that it can be sorted on the index in addition to the key
+ // value. The value needs it so that POPackage can properly
+ // assign the tuple to its slot in the projection.
+ key.setIndex(index);
+ val.setIndex(index);
+
+ oc.write(key, val);
+ }
+ }
+
+ /**
+ * Used by Skewed Join
+ */
+ public static class MapWithPartitionIndex extends Map {
+
+ @Override
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
+ Byte tupleKeyIdx = 2;
+ Byte tupleValIdx = 3;
+
+ Byte index = (Byte)tuple.get(0);
+ Integer partitionIndex = -1;
+ // for partitioning table, the partition index isn't present
+ if (tuple.size() == 3) {
+ //super.collect(oc, tuple);
+ //return;
+ tupleKeyIdx--;
+ tupleValIdx--;
+ } else {
+ partitionIndex = (Integer)tuple.get(1);
+ }
+
+ PigNullableWritable key =
+ HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
+
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+
+ NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
+
+ // Both the key and the value need the index. The key needs it so
+ // that it can be sorted on the index in addition to the key
+ // value. The value needs it so that POPackage can properly
+ // assign the tuple to its slot in the projection.
+ wrappedKey.setIndex(index);
+
+ // set the partition
+ wrappedKey.setPartition(partitionIndex);
+ val.setIndex(index);
+ oc.write(wrappedKey, val);
+ }
+
+ @Override
+ protected void runPipeline(PhysicalOperator leaf)
+ throws IOException, InterruptedException {
+
+ while(true){
+ Result res = leaf.getNext(DUMMYTUPLE);
+
+ if(res.returnStatus==POStatus.STATUS_OK){
+ // For POPartitionRearrange, the result is a bag.
+ // This operator is used for skewed join
+ if (res.result instanceof DataBag) {
+ Iterator<Tuple> its = ((DataBag)res.result).iterator();
+ while(its.hasNext()) {
+ collect(outputCollector, its.next());
+ }
+ }else{
+ collect(outputCollector, (Tuple)res.result);
+ }
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInMap = true;
+ // if there is an errmessage use it
+ String errMsg;
+ if(res.result != null) {
+ errMsg = "Received Error while " +
+ "processing the map plan: " + res.result;
+ } else {
+ errMsg = "Received Error while " +
+ "processing the map plan.";
+ }
+
+ int errCode = 2055;
+ throw new ExecException(errMsg, errCode, PigException.BUG);
+ }
+ }
+ }
+ }
+
+ abstract public static class Reduce
+ extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ //The reduce plan
+ protected PhysicalPlan rp = null;
+
+ // Store operators
+ protected List<POStore> stores;
+
+ //The POPackage operator which is the
+ //root of every Map Reduce plan is
+ //obtained through the job conf. The portion
+ //remaining after its removal is the reduce
+ //plan
+ protected POPackage pack;
+
+ ProgressableReporter pigReporter;
+
+ protected Context outputCollector;
+
+ protected boolean errorInReduce = false;
+
+ PhysicalOperator[] roots;
+
+ private PhysicalOperator leaf;
+
+ PigContext pigContext = null;
+ protected volatile boolean initialized = false;
+
+ private boolean inIllustrator = false;
+
+ /**
+ * Set the reduce plan: to be used by local runner for illustrator
+ * @param plan Reduce plan
+ */
+ public void setReducePlan(PhysicalPlan plan) {
+ rp = plan;
+ }
+
+ /**
+ * Configures the Reduce plan, the POPackage operator
+ * and the reporter thread
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ inIllustrator = (context instanceof PigMapReduce.Reduce.IllustratorContext);
+ if (inIllustrator)
+ pack = ((PigMapReduce.Reduce.IllustratorContext) context).pack;
+ Configuration jConf = context.getConfiguration();
+ SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+ sJobContext = context;
+ sJobConfInternal.set(context.getConfiguration());
+ sJobConf = context.getConfiguration();
+ try {
+ PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
+ pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+
+ if (rp == null)
+ rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
+ .get("pig.reducePlan"));
+ stores = PlanHelper.getStores(rp);
+
+ if (!inIllustrator)
+ pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
+ // To be removed
+ if(rp.isEmpty())
+ log.debug("Reduce Plan empty!");
+ else{
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ rp.explain(baos);
+ log.debug(baos.toString());
+ }
+ pigReporter = new ProgressableReporter();
+ if(!(rp.isEmpty())) {
+ roots = rp.getRoots().toArray(new PhysicalOperator[1]);
+ leaf = rp.getLeaves().get(0);
+ }
+
+ // Get the UDF specific context
+ MapRedUtil.setupUDFContext(jConf);
+
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring reduce plan.";
+ throw new RuntimeException(msg, ioe);
+ }
+ }
+
+ /**
+ * The reduce function which packages the key and List<Tuple>
+ * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+ * The package result is either collected as is, if the reduce plan is
+ * empty or after passing through the reduce plan.
+ */
+ @Override
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
+
+ if (!initialized) {
+ initialized = true;
+
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = context;
+ pigReporter.setRep(context);
+ PhysicalOperator.setReporter(pigReporter);
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PigStatusReporter.setContext(context);
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+
+ if (!inIllustrator)
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ }
+
+ // In the case we optimize the join, we combine
+ // POPackage and POForeach - so we could get many
+ // tuples out of the getnext() call of POJoinPackage
+ // In this case, we process till we see EOP from
+ // POJoinPacakage.getNext()
+ if (pack instanceof POJoinPackage)
+ {
+ pack.attachInput(key, tupIter.iterator());
+ while (true)
+ {
+ if (processOnePackageOutput(context))
+ break;
+ }
+ }
+ else {
+ // join is not optimized, so package will
+ // give only one tuple out for the key
+ pack.attachInput(key, tupIter.iterator());
+ processOnePackageOutput(context);
+ }
+ }
+
+ // return: false-more output
+ // true- end of processing
+ public boolean processOnePackageOutput(Context oc)
+ throws IOException, InterruptedException {
+
+ Result res = pack.getNext(DUMMYTUPLE);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
+
+ if(rp.isEmpty()){
+ oc.write(null, packRes);
+ return false;
+ }
+ for (int i = 0; i < roots.length; i++) {
+ roots[i].attachInput(packRes);
+ }
+ runPipeline(leaf);
+
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ return false;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return true;
+ }
+
+ return false;
+
+ }
+
+ /**
+ * @param leaf
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ protected void runPipeline(PhysicalOperator leaf)
+ throws InterruptedException, IOException {
+
+ while(true)
+ {
+ Result redRes = leaf.getNext(DUMMYTUPLE);
+ if(redRes.returnStatus==POStatus.STATUS_OK){
+ try{
+ outputCollector.write(null, (Tuple)redRes.result);
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
+ continue;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInReduce = true;
+ // if there is an errmessage use it
+ String msg;
+ if(redRes.result != null) {
+ msg = "Received Error while " +
+ "processing the reduce plan: " + redRes.result;
+ } else {
+ msg = "Received Error while " +
+ "processing the reduce plan.";
+ }
+ int errCode = 2090;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+ }
+
+ /**
+ * Will be called once all the intermediate keys and values are
+ * processed. So right place to stop the reporter thread.
+ */
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ super.cleanup(context);
+
+ if(errorInReduce) {
+ // there was an error in reduce - just return
+ return;
+ }
+
+ if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
+ // If there is a stream in the pipeline we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream in the pipeline
+ rp.endOfAllInput = true;
+ runPipeline(leaf);
+ }
+
+ for (POStore store: stores) {
+ if (!initialized) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ store.tearDown();
+ }
+
+ //Calling EvalFunc.finish()
+ UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
+ try {
+ finisher.visit();
+ } catch (VisitorException e) {
+ throw new IOException("Error trying to finish UDFs",e);
+ }
+
+ PhysicalOperator.setReporter(null);
+ initialized = false;
+ }
+
+ /**
+ * Get reducer's illustrator context
+ *
+ * @param input Input buffer as output by maps
+ * @param pkg package
+ * @return reducer's illustrator context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ abstract public Context getIllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
+ }
+
+ /**
+ * This "specialized" reduce class is ONLY to be used in pig queries with
+ * order by a udf. A UDF used for comparison in the order by expects
+ * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
+ * ensures that the "key" used in the order by is wrapped into a tuple (if it
+ * isn't already a tuple). This reduce class unwraps this tuple in the case where
+ * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
+ * for processing
+ */
+ public static class ReduceWithComparator extends PigMapReduce.Reduce {
+
+ private byte keyType;
+
+ /**
+ * Configures the Reduce plan, the POPackage operator
+ * and the reporter thread
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ keyType = pack.getKeyType();
+ }
+
+ /**
+ * The reduce function which packages the key and List<Tuple>
+ * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+ * The package result is either collected as is, if the reduce plan is
+ * empty or after passing through the reduce plan.
+ */
+ @Override
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
+
+ if (!initialized) {
+ initialized = true;
+
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = context;
+ pigReporter.setRep(context);
+ PhysicalOperator.setReporter(pigReporter);
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PigStatusReporter.setContext(context);
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ }
+
+ // If the keyType is not a tuple, the MapWithComparator.collect()
+ // would have wrapped the key into a tuple so that the
+ // comparison UDF used in the order by can process it.
+ // We need to unwrap the key out of the tuple and hand it
+ // to the POPackage for processing
+ if(keyType != DataType.TUPLE) {
+ Tuple t = (Tuple)(key.getValueAsPigType());
+ try {
+ key = HDataType.getWritableComparableTypes(t.get(0), keyType);
+ } catch (ExecException e) {
+ throw e;
+ }
+ }
+
+ pack.attachInput(key, tupIter.iterator());
+
+ Result res = pack.getNext(DUMMYTUPLE);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ Tuple packRes = (Tuple)res.result;
+
+ if(rp.isEmpty()){
+ context.write(null, packRes);
+ return;
+ }
+
+ rp.attachInput(packRes);
+
+ List<PhysicalOperator> leaves = rp.getLeaves();
+
+ PhysicalOperator leaf = leaves.get(0);
+ runPipeline(leaf);
+
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ int errCode = 2093;
+ String msg = "Encountered error in package operator while processing group.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ }
+
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Tue Jul 19 01:01:53 2011
@@ -47,6 +47,7 @@ import org.apache.pig.CollectableLoadFun
import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -266,7 +267,7 @@ public class PigInputFormat extends Inpu
// get the InputFormat from it and ask for splits
InputFormat inpFormat = loadFunc.getInputFormat();
List<InputSplit> oneInputSplits = inpFormat.getSplits(
- new JobContext(inputSpecificJob.getConfiguration(),
+ HadoopShims.createJobContext(inputSpecificJob.getConfiguration(),
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(), combinable, confClone);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-
-public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
- private static final Tuple DUMMYTUPLE = null;
-
- private final Log log = LogFactory.getLog(getClass());
-
- protected byte keyType;
-
- //Map Plan
- protected PhysicalPlan mp = null;
-
- // Store operators
- protected List<POStore> stores;
-
- protected TupleFactory tf = TupleFactory.getInstance();
-
- boolean inIllustrator = false;
-
- Context outputCollector;
-
- // Reporter that will be used by operators
- // to transmit heartbeat
- ProgressableReporter pigReporter;
-
- protected boolean errorInMap = false;
-
- PhysicalOperator[] roots;
-
- private PhysicalOperator leaf;
-
- PigContext pigContext = null;
- private volatile boolean initialized = false;
-
- /**
- * for local map/reduce simulation
- * @param plan the map plan
- */
- public void setMapPlan(PhysicalPlan plan) {
- mp = plan;
- }
-
- /**
- * Will be called when all the tuples in the input
- * are done. So reporter thread should be closed.
- */
- @Override
- public void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- if(errorInMap) {
- //error in map - returning
- return;
- }
-
- if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
- // If there is a stream in the pipeline or if this map job belongs to merge-join we could
- // potentially have more to process - so lets
- // set the flag stating that all map input has been sent
- // already and then lets run the pipeline one more time
- // This will result in nothing happening in the case
- // where there is no stream or it is not a merge-join in the pipeline
- mp.endOfAllInput = true;
- runPipeline(leaf);
- }
-
- for (POStore store: stores) {
- if (!initialized) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- store.setUp();
- }
- store.tearDown();
- }
-
- //Calling EvalFunc.finish()
- UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
- try {
- finisher.visit();
- } catch (VisitorException e) {
- int errCode = 2121;
- String msg = "Error while calling finish method on UDFs.";
- throw new VisitorException(msg, errCode, PigException.BUG, e);
- }
-
- mp = null;
-
- PhysicalOperator.setReporter(null);
- initialized = false;
- }
-
- /**
- * Configures the mapper with the map plan and the
- * reproter thread
- */
- @SuppressWarnings("unchecked")
- @Override
- public void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
-
- Configuration job = context.getConfiguration();
- SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
- PigMapReduce.sJobContext = context;
- PigMapReduce.sJobConfInternal.set(context.getConfiguration());
- PigMapReduce.sJobConf = context.getConfiguration();
- inIllustrator = (context instanceof IllustratorContext);
-
- PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
- if (pigContext.getLog4jProperties()!=null)
- PropertyConfigurator.configure(pigContext.getLog4jProperties());
-
- if (mp == null)
- mp = (PhysicalPlan) ObjectSerializer.deserialize(
- job.get("pig.mapPlan"));
- stores = PlanHelper.getStores(mp);
-
- // To be removed
- if(mp.isEmpty())
- log.debug("Map Plan empty!");
- else{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- mp.explain(baos);
- log.debug(baos.toString());
- }
- keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
- // till here
-
- pigReporter = new ProgressableReporter();
- // Get the UDF specific context
- MapRedUtil.setupUDFContext(job);
-
- if(!(mp.isEmpty())) {
-
- PigSplit split = (PigSplit)context.getInputSplit();
- List<OperatorKey> targetOpKeys = split.getTargetOps();
-
- ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
- for (OperatorKey targetKey : targetOpKeys) {
- targetOpsAsList.add(mp.getOperator(targetKey));
- }
- roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
- leaf = mp.getLeaves().get(0);
- }
-
- PigStatusReporter.setContext(context);
-
- }
-
- /**
- * The map function that attaches the inpTuple appropriately
- * and executes the map plan if its not empty. Collects the
- * result of execution into oc or the input directly to oc
- * if map plan empty. The collection is left abstract for the
- * map-only or map-reduce job to implement. Map-only collects
- * the tuple as-is whereas map-reduce collects it after extracting
- * the key and indexed tuple.
- */
- @Override
- protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
- if(!initialized) {
- initialized = true;
- // cache the collector for use in runPipeline() which
- // can be called from close()
- this.outputCollector = context;
- pigReporter.setRep(context);
- PhysicalOperator.setReporter(pigReporter);
-
- for (POStore store: stores) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- if (!pigContext.inIllustrator)
- store.setUp();
- }
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
- PhysicalOperator.setPigLogger(pigHadoopLogger);
- }
-
- if (mp.isEmpty()) {
- collect(context,inpTuple);
- return;
- }
-
- for (PhysicalOperator root : roots) {
- if (inIllustrator) {
- if (root != null) {
- root.attachInput(inpTuple);
- }
- } else {
- root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
- }
- }
-
- runPipeline(leaf);
- }
-
- protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
- while(true){
- Result res = leaf.getNext(DUMMYTUPLE);
- if(res.returnStatus==POStatus.STATUS_OK){
- collect(outputCollector,(Tuple)res.result);
- continue;
- }
-
- if(res.returnStatus==POStatus.STATUS_EOP) {
- return;
- }
-
- if(res.returnStatus==POStatus.STATUS_NULL)
- continue;
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
- // close() we can do the right thing
- errorInMap = true;
- // if there is an errmessage use it
- String errMsg;
- if(res.result != null) {
- errMsg = "Received Error while " +
- "processing the map plan: " + res.result;
- } else {
- errMsg = "Received Error while " +
- "processing the map plan.";
- }
-
- int errCode = 2055;
- ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
- throw ee;
- }
- }
-
- }
-
- abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
-
- /**
- * @return the keyType
- */
- public byte getKeyType() {
- return keyType;
- }
-
- /**
- * @param keyType the keyType to set
- */
- public void setKeyType(byte keyType) {
- this.keyType = keyType;
- }
-
- /**
- *
- * Get mapper's illustrator context
- *
- * @param conf Configuration
- * @param input Input bag to serve as data source
- * @param output Map output buffer
- * @param split the split
- * @return Illustrator's context
- * @throws IOException
- * @throws InterruptedException
- */
- public Context getIllustratorContext(Configuration conf, DataBag input,
- List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
- throws IOException, InterruptedException {
- return new IllustratorContext(conf, input, output, split);
- }
-
- public class IllustratorContext extends Context {
- private DataBag input;
- List<Pair<PigNullableWritable, Writable>> output;
- private Iterator<Tuple> it = null;
- private Tuple value = null;
- private boolean init = false;
-
- public IllustratorContext(Configuration conf, DataBag input,
- List<Pair<PigNullableWritable, Writable>> output,
- InputSplit split) throws IOException, InterruptedException {
- super(conf, new TaskAttemptID(), null, null, null, null, split);
- if (output == null)
- throw new IOException("Null output can not be used");
- this.input = input; this.output = output;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (input == null) {
- if (!init) {
- init = true;
- return true;
- }
- return false;
- }
- if (it == null)
- it = input.iterator();
- if (!it.hasNext())
- return false;
- value = it.next();
- return true;
- }
-
- @Override
- public Text getCurrentKey() {
- return null;
- }
-
- @Override
- public Tuple getCurrentValue() {
- return value;
- }
-
- @Override
- public void write(PigNullableWritable key, Writable value)
- throws IOException, InterruptedException {
- output.add(new Pair<PigNullableWritable, Writable>(key, value));
- }
-
- @Override
- public void progress() {
-
- }
- }
-}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Collections;
-import java.util.Comparator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.pen.FakeRawKeyValueIterator;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-
-/**
- * This class is the static Mapper & Reducer classes that
- * are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a
- * POLocalRearrange. So the map phase has to separate the
- * key and tuple and collect it into the output
- * collector.
- *
- * The shuffle and sort phase sorts these keys & tuples
- * and creates key, List<Tuple> and passes the key and
- * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List<Tuple> into pigKey,
- * Bag<Tuple> where pigKey is of the appropriate pig type and
- * then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result
- * of the reduce plan or the package res is collected into
- * the output collector.
- *
- * The index of the tuple (that is, which bag it should be placed in by the
- * package) is packed into the key. This is done so that hadoop sorts the
- * keys in order of index for join.
- *
- */
-public class PigMapReduce {
-
- public static JobContext sJobContext = null;
-
- /**
- * @deprecated Use {@link UDFContext} instead in the following way to get
- * the job's {@link Configuration}:
- * <pre>UdfContext.getUdfContext().getJobConf()</pre>
- */
- @Deprecated
- public static Configuration sJobConf = null;
-
- public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
- private final static Tuple DUMMYTUPLE = null;
-
- public static class Map extends PigMapBase {
-
- @Override
- public void collect(Context oc, Tuple tuple)
- throws InterruptedException, IOException {
-
- Byte index = (Byte)tuple.get(0);
- PigNullableWritable key =
- HDataType.getWritableComparableTypes(tuple.get(1), keyType);
- NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
- // Both the key and the value need the index. The key needs it so
- // that it can be sorted on the index in addition to the key
- // value. The value needs it so that POPackage can properly
- // assign the tuple to its slot in the projection.
- key.setIndex(index);
- val.setIndex(index);
-
- oc.write(key, val);
- }
- }
-
- /**
- * This "specialized" map class is ONLY to be used in pig queries with
- * order by a udf. A UDF used for comparison in the order by expects
- * to be handed tuples. Hence this map class ensures that the "key" used
- * in the order by is wrapped into a tuple (if it isn't already a tuple)
- */
- public static class MapWithComparator extends PigMapBase {
-
- @Override
- public void collect(Context oc, Tuple tuple)
- throws InterruptedException, IOException {
-
- Object keyTuple = null;
- if(keyType != DataType.TUPLE) {
- Object k = tuple.get(1);
- keyTuple = tf.newTuple(k);
- } else {
- keyTuple = tuple.get(1);
- }
-
-
- Byte index = (Byte)tuple.get(0);
- PigNullableWritable key =
- HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
- NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
- // Both the key and the value need the index. The key needs it so
- // that it can be sorted on the index in addition to the key
- // value. The value needs it so that POPackage can properly
- // assign the tuple to its slot in the projection.
- key.setIndex(index);
- val.setIndex(index);
-
- oc.write(key, val);
- }
- }
-
- /**
- * Used by Skewed Join
- */
- public static class MapWithPartitionIndex extends Map {
-
- @Override
- public void collect(Context oc, Tuple tuple)
- throws InterruptedException, IOException {
-
- Byte tupleKeyIdx = 2;
- Byte tupleValIdx = 3;
-
- Byte index = (Byte)tuple.get(0);
- Integer partitionIndex = -1;
- // for partitioning table, the partition index isn't present
- if (tuple.size() == 3) {
- //super.collect(oc, tuple);
- //return;
- tupleKeyIdx--;
- tupleValIdx--;
- } else {
- partitionIndex = (Integer)tuple.get(1);
- }
-
- PigNullableWritable key =
- HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
-
- NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
-
- NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-
- // Both the key and the value need the index. The key needs it so
- // that it can be sorted on the index in addition to the key
- // value. The value needs it so that POPackage can properly
- // assign the tuple to its slot in the projection.
- wrappedKey.setIndex(index);
-
- // set the partition
- wrappedKey.setPartition(partitionIndex);
- val.setIndex(index);
- oc.write(wrappedKey, val);
- }
-
- @Override
- protected void runPipeline(PhysicalOperator leaf)
- throws IOException, InterruptedException {
-
- while(true){
- Result res = leaf.getNext(DUMMYTUPLE);
-
- if(res.returnStatus==POStatus.STATUS_OK){
- // For POPartitionRearrange, the result is a bag.
- // This operator is used for skewed join
- if (res.result instanceof DataBag) {
- Iterator<Tuple> its = ((DataBag)res.result).iterator();
- while(its.hasNext()) {
- collect(outputCollector, its.next());
- }
- }else{
- collect(outputCollector, (Tuple)res.result);
- }
- continue;
- }
-
- if(res.returnStatus==POStatus.STATUS_EOP) {
- return;
- }
-
- if(res.returnStatus==POStatus.STATUS_NULL) {
- continue;
- }
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
- // close() we can do the right thing
- errorInMap = true;
- // if there is an errmessage use it
- String errMsg;
- if(res.result != null) {
- errMsg = "Received Error while " +
- "processing the map plan: " + res.result;
- } else {
- errMsg = "Received Error while " +
- "processing the map plan.";
- }
-
- int errCode = 2055;
- throw new ExecException(errMsg, errCode, PigException.BUG);
- }
- }
- }
- }
-
- public static class Reduce
- extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
- protected final Log log = LogFactory.getLog(getClass());
-
- //The reduce plan
- protected PhysicalPlan rp = null;
-
- // Store operators
- protected List<POStore> stores;
-
- //The POPackage operator which is the
- //root of every Map Reduce plan is
- //obtained through the job conf. The portion
- //remaining after its removal is the reduce
- //plan
- protected POPackage pack;
-
- ProgressableReporter pigReporter;
-
- protected Context outputCollector;
-
- protected boolean errorInReduce = false;
-
- PhysicalOperator[] roots;
-
- private PhysicalOperator leaf;
-
- PigContext pigContext = null;
- protected volatile boolean initialized = false;
-
- private boolean inIllustrator = false;
-
- /**
- * Set the reduce plan: to be used by local runner for illustrator
- * @param plan Reduce plan
- */
- public void setReducePlan(PhysicalPlan plan) {
- rp = plan;
- }
-
- /**
- * Configures the Reduce plan, the POPackage operator
- * and the reporter thread
- */
- @SuppressWarnings("unchecked")
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- inIllustrator = (context instanceof IllustratorContext);
- if (inIllustrator)
- pack = ((IllustratorContext) context).pack;
- Configuration jConf = context.getConfiguration();
- SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
- sJobContext = context;
- sJobConfInternal.set(context.getConfiguration());
- sJobConf = context.getConfiguration();
- try {
- PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-
- if (rp == null)
- rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
- .get("pig.reducePlan"));
- stores = PlanHelper.getStores(rp);
-
- if (!inIllustrator)
- pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
- // To be removed
- if(rp.isEmpty())
- log.debug("Reduce Plan empty!");
- else{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- rp.explain(baos);
- log.debug(baos.toString());
- }
- pigReporter = new ProgressableReporter();
- if(!(rp.isEmpty())) {
- roots = rp.getRoots().toArray(new PhysicalOperator[1]);
- leaf = rp.getLeaves().get(0);
- }
-
- // Get the UDF specific context
- MapRedUtil.setupUDFContext(jConf);
-
- } catch (IOException ioe) {
- String msg = "Problem while configuring reduce plan.";
- throw new RuntimeException(msg, ioe);
- }
- }
-
- /**
- * The reduce function which packages the key and List<Tuple>
- * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
- * The package result is either collected as is, if the reduce plan is
- * empty or after passing through the reduce plan.
- */
- @Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
- throws IOException, InterruptedException {
-
- if (!initialized) {
- initialized = true;
-
- // cache the collector for use in runPipeline()
- // which could additionally be called from close()
- this.outputCollector = context;
- pigReporter.setRep(context);
- PhysicalOperator.setReporter(pigReporter);
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
- PhysicalOperator.setPigLogger(pigHadoopLogger);
-
- if (!inIllustrator)
- for (POStore store: stores) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- store.setUp();
- }
- }
-
- // In the case we optimize the join, we combine
- // POPackage and POForeach - so we could get many
- // tuples out of the getnext() call of POJoinPackage
- // In this case, we process till we see EOP from
- // POJoinPacakage.getNext()
- if (pack instanceof POJoinPackage)
- {
- pack.attachInput(key, tupIter.iterator());
- while (true)
- {
- if (processOnePackageOutput(context))
- break;
- }
- }
- else {
- // join is not optimized, so package will
- // give only one tuple out for the key
- pack.attachInput(key, tupIter.iterator());
- processOnePackageOutput(context);
- }
- }
-
- // return: false-more output
- // true- end of processing
- public boolean processOnePackageOutput(Context oc)
- throws IOException, InterruptedException {
-
- Result res = pack.getNext(DUMMYTUPLE);
- if(res.returnStatus==POStatus.STATUS_OK){
- Tuple packRes = (Tuple)res.result;
-
- if(rp.isEmpty()){
- oc.write(null, packRes);
- return false;
- }
- for (int i = 0; i < roots.length; i++) {
- roots[i].attachInput(packRes);
- }
- runPipeline(leaf);
-
- }
-
- if(res.returnStatus==POStatus.STATUS_NULL) {
- return false;
- }
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- int errCode = 2093;
- String msg = "Encountered error in package operator while processing group.";
- throw new ExecException(msg, errCode, PigException.BUG);
- }
-
- if(res.returnStatus==POStatus.STATUS_EOP) {
- return true;
- }
-
- return false;
-
- }
-
- /**
- * @param leaf
- * @throws InterruptedException
- * @throws IOException
- */
- protected void runPipeline(PhysicalOperator leaf)
- throws InterruptedException, IOException {
-
- while(true)
- {
- Result redRes = leaf.getNext(DUMMYTUPLE);
- if(redRes.returnStatus==POStatus.STATUS_OK){
- try{
- outputCollector.write(null, (Tuple)redRes.result);
- }catch(Exception e) {
- throw new IOException(e);
- }
- continue;
- }
-
- if(redRes.returnStatus==POStatus.STATUS_EOP) {
- return;
- }
-
- if(redRes.returnStatus==POStatus.STATUS_NULL) {
- continue;
- }
-
- if(redRes.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
- // close() we can do the right thing
- errorInReduce = true;
- // if there is an errmessage use it
- String msg;
- if(redRes.result != null) {
- msg = "Received Error while " +
- "processing the reduce plan: " + redRes.result;
- } else {
- msg = "Received Error while " +
- "processing the reduce plan.";
- }
- int errCode = 2090;
- throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
- }
-
- /**
- * Will be called once all the intermediate keys and values are
- * processed. So right place to stop the reporter thread.
- */
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
-
- if(errorInReduce) {
- // there was an error in reduce - just return
- return;
- }
-
- if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
- // If there is a stream in the pipeline we could
- // potentially have more to process - so lets
- // set the flag stating that all map input has been sent
- // already and then lets run the pipeline one more time
- // This will result in nothing happening in the case
- // where there is no stream in the pipeline
- rp.endOfAllInput = true;
- runPipeline(leaf);
- }
-
- for (POStore store: stores) {
- if (!initialized) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- store.setUp();
- }
- store.tearDown();
- }
-
- //Calling EvalFunc.finish()
- UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
- try {
- finisher.visit();
- } catch (VisitorException e) {
- throw new IOException("Error trying to finish UDFs",e);
- }
-
- PhysicalOperator.setReporter(null);
- initialized = false;
- }
-
- /**
- * Get reducer's illustrator context
- *
- * @param input Input buffer as output by maps
- * @param pkg package
- * @return reducer's illustrator context
- * @throws IOException
- * @throws InterruptedException
- */
- public Context getIllustratorContext(Job job,
- List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
- return new IllustratorContext(job, input, pkg);
- }
-
- @SuppressWarnings("unchecked")
- public class IllustratorContext extends Context {
- private PigNullableWritable currentKey = null, nextKey = null;
- private NullableTuple nextValue = null;
- private List<NullableTuple> currentValues = null;
- private Iterator<Pair<PigNullableWritable, Writable>> it;
- private final ByteArrayOutputStream bos;
- private final DataOutputStream dos;
- private final RawComparator sortComparator, groupingComparator;
- POPackage pack = null;
-
- public IllustratorContext(Job job,
- List<Pair<PigNullableWritable, Writable>> input,
- POPackage pkg
- ) throws IOException, InterruptedException {
- super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
- null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
- bos = new ByteArrayOutputStream();
- dos = new DataOutputStream(bos);
- org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
- sortComparator = nwJob.getSortComparator();
- groupingComparator = nwJob.getGroupingComparator();
-
- Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
- @Override
- public int compare(Pair<PigNullableWritable, Writable> o1,
- Pair<PigNullableWritable, Writable> o2) {
- try {
- o1.first.write(dos);
- int l1 = bos.size();
- o2.first.write(dos);
- int l2 = bos.size();
- byte[] bytes = bos.toByteArray();
- bos.reset();
- return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
- } catch (IOException e) {
- throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
- }
- }
- }
- );
- currentValues = new ArrayList<NullableTuple>();
- it = input.iterator();
- if (it.hasNext()) {
- Pair<PigNullableWritable, Writable> entry = it.next();
- nextKey = entry.first;
- nextValue = (NullableTuple) entry.second;
- }
- pack = pkg;
- }
-
- @Override
- public PigNullableWritable getCurrentKey() {
- return currentKey;
- }
-
- @Override
- public boolean nextKey() {
- if (nextKey == null)
- return false;
- currentKey = nextKey;
- currentValues.clear();
- currentValues.add(nextValue);
- nextKey = null;
- for(; it.hasNext(); ) {
- Pair<PigNullableWritable, Writable> entry = it.next();
- /* Why can't raw comparison be used?
- byte[] bytes;
- int l1, l2;
- try {
- currentKey.write(dos);
- l1 = bos.size();
- entry.first.write(dos);
- l2 = bos.size();
- bytes = bos.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException("nextKey exception : "+e.getMessage());
- }
- bos.reset();
- if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
- */
- if (groupingComparator.compare(currentKey, entry.first) == 0)
- {
- currentValues.add((NullableTuple)entry.second);
- } else {
- nextKey = entry.first;
- nextValue = (NullableTuple) entry.second;
- break;
- }
- }
- return true;
- }
-
- @Override
- public Iterable<NullableTuple> getValues() {
- return currentValues;
- }
-
- @Override
- public void write(PigNullableWritable k, Writable t) {
- }
-
- @Override
- public void progress() {
- }
- }
- }
-
- /**
- * This "specialized" reduce class is ONLY to be used in pig queries with
- * order by a udf. A UDF used for comparison in the order by expects
- * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
- * ensures that the "key" used in the order by is wrapped into a tuple (if it
- * isn't already a tuple). This reduce class unwraps this tuple in the case where
- * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
- * for processing
- */
- public static class ReduceWithComparator extends PigMapReduce.Reduce {
-
- private byte keyType;
-
- /**
- * Configures the Reduce plan, the POPackage operator
- * and the reporter thread
- */
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- keyType = pack.getKeyType();
- }
-
- /**
- * The reduce function which packages the key and List<Tuple>
- * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
- * The package result is either collected as is, if the reduce plan is
- * empty or after passing through the reduce plan.
- */
- @Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
- throws IOException, InterruptedException {
-
- if (!initialized) {
- initialized = true;
-
- // cache the collector for use in runPipeline()
- // which could additionally be called from close()
- this.outputCollector = context;
- pigReporter.setRep(context);
- PhysicalOperator.setReporter(pigReporter);
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
- PhysicalOperator.setPigLogger(pigHadoopLogger);
-
- for (POStore store: stores) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- store.setUp();
- }
- }
-
- // If the keyType is not a tuple, the MapWithComparator.collect()
- // would have wrapped the key into a tuple so that the
- // comparison UDF used in the order by can process it.
- // We need to unwrap the key out of the tuple and hand it
- // to the POPackage for processing
- if(keyType != DataType.TUPLE) {
- Tuple t = (Tuple)(key.getValueAsPigType());
- try {
- key = HDataType.getWritableComparableTypes(t.get(0), keyType);
- } catch (ExecException e) {
- throw e;
- }
- }
-
- pack.attachInput(key, tupIter.iterator());
-
- Result res = pack.getNext(DUMMYTUPLE);
- if(res.returnStatus==POStatus.STATUS_OK){
- Tuple packRes = (Tuple)res.result;
-
- if(rp.isEmpty()){
- context.write(null, packRes);
- return;
- }
-
- rp.attachInput(packRes);
-
- List<PhysicalOperator> leaves = rp.getLeaves();
-
- PhysicalOperator leaf = leaves.get(0);
- runPipeline(leaf);
-
- }
-
- if(res.returnStatus==POStatus.STATUS_NULL) {
- return;
- }
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- int errCode = 2093;
- String msg = "Encountered error in package operator while processing group.";
- throw new ExecException(msg, errCode, PigException.BUG);
- }
-
- }
-
- }
-
-}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Jul 19 01:01:53 2011
@@ -31,6 +31,7 @@ import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -103,7 +104,7 @@ public class PigOutputCommitter extends
MapRedUtil.setupUDFContext(context.getConfiguration());
// make a copy of the context so that the actions after this call
// do not end up updating the same context
- TaskAttemptContext contextCopy = new TaskAttemptContext(
+ TaskAttemptContext contextCopy = HadoopShims.createTaskAttemptContext(
context.getConfiguration(), context.getTaskAttemptID());
// call setLocation() on the storeFunc so that if there are any
@@ -118,8 +119,9 @@ public class PigOutputCommitter extends
POStore store) throws IOException {
// make a copy of the context so that the actions after this call
// do not end up updating the same context
- JobContext contextCopy = new JobContext(
+ JobContext contextCopy = HadoopShims.createJobContext(
context.getConfiguration(), context.getJobID());
+ MapRedUtil.setupUDFContext(context.getConfiguration());
// call setLocation() on the storeFunc so that if there are any
// side effects like setting map.output.dir on the Configuration
@@ -165,7 +167,7 @@ public class PigOutputCommitter extends
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
- if(context.getTaskAttemptID().isMap()) {
+ if(HadoopShims.isMap(context.getTaskAttemptID())) {
for (Pair<OutputCommitter, POStore> mapCommitter :
mapOutputCommitters) {
TaskAttemptContext updatedContext = setUpContext(context,
@@ -184,7 +186,7 @@ public class PigOutputCommitter extends
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
- if(context.getTaskAttemptID().isMap()) {
+ if(HadoopShims.isMap(context.getTaskAttemptID())) {
for (Pair<OutputCommitter, POStore> mapCommitter :
mapOutputCommitters) {
TaskAttemptContext updatedContext = setUpContext(context,
@@ -205,7 +207,7 @@ public class PigOutputCommitter extends
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
boolean needCommit = false;
- if(context.getTaskAttemptID().isMap()) {
+ if(HadoopShims.isMap(context.getTaskAttemptID())) {
for (Pair<OutputCommitter, POStore> mapCommitter :
mapOutputCommitters) {
TaskAttemptContext updatedContext = setUpContext(context,
@@ -244,7 +246,7 @@ public class PigOutputCommitter extends
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
- if(context.getTaskAttemptID().isMap()) {
+ if(HadoopShims.isMap(context.getTaskAttemptID())) {
for (Pair<OutputCommitter, POStore> mapCommitter :
mapOutputCommitters) {
TaskAttemptContext updatedContext = setUpContext(context,
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Jul 19 01:01:53 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -192,7 +193,7 @@ public class PigOutputFormat extends Out
for (POStore store : stores) {
// make a copy of the original JobContext so that
// each OutputFormat get a different copy
- JobContext jobContextCopy = new JobContext(
+ JobContext jobContextCopy = HadoopShims.createJobContext(
jobcontext.getConfiguration(), jobcontext.getJobID());
// set output location
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Tue Jul 19 01:01:53 2011
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.ExecType;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -37,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
@@ -47,12 +50,14 @@ import org.apache.pig.impl.io.NullableTe
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
PigNullableWritable[] quantiles;
RawComparator<PigNullableWritable> comparator;
+ PigContext pigContext;
final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
= new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
@@ -85,6 +90,13 @@ public class WeightedRangePartitioner ex
public void setConf(Configuration configuration) {
job = configuration;
+ try {
+ pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+ } catch (IOException e1) {
+ // should not happen
+ e1.printStackTrace();
+ }
+
String quantilesFile = configuration.get("pig.quantilesFile", "");
if (quantilesFile.length() == 0) {
@@ -96,7 +108,12 @@ public class WeightedRangePartitioner ex
// use local file system to get the quantilesFile
- Configuration conf = new Configuration(false);
+ Configuration conf;
+ if (pigContext.getExecType()==ExecType.MAPREDUCE) {
+ conf = new Configuration(true);
+ } else {
+ conf = new Configuration(false);
+ }
if (configuration.get("fs.file.impl")!=null)
conf.set("fs.file.impl", configuration.get("fs.file.impl"));
if (configuration.get("fs.hdfs.impl")!=null)
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 19 01:01:53 2011
@@ -142,6 +142,7 @@ public class PigContext implements Seria
this.execType = execType;
this.properties = properties;
+ this.properties.setProperty("exectype", this.execType.name());
String pigJar = JarManager.findContainingJar(Main.class);
String hadoopJar = JarManager.findContainingJar(FileSystem.class);
if (pigJar != null) {
Modified: pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigFile.java Tue Jul 19 01:01:53 2011
@@ -34,6 +34,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -41,7 +42,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
-
public class PigFile {
private String file = null;
boolean append = false;
@@ -70,7 +70,7 @@ public class PigFile {
public void store(DataBag data, FuncSpec storeFuncSpec, PigContext pigContext) throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
// create a simulated JobContext
- JobContext jc = new JobContext(conf, new JobID());
+ JobContext jc = HadoopShims.createJobContext(conf, new JobID());
StoreFuncInterface sfunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(
storeFuncSpec);
OutputFormat<?,?> of = sfunc.getOutputFormat();
@@ -80,7 +80,7 @@ public class PigFile {
PigOutputFormat.setLocation(jc, store);
OutputCommitter oc;
// create a simulated TaskAttemptContext
- TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext tac = HadoopShims.createTaskAttemptContext(conf, new TaskAttemptID());
PigOutputFormat.setLocation(tac, store);
RecordWriter<?,?> rw ;
try {