You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/28 23:59:43 UTC
svn commit: r1590819 - in /pig/trunk: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengi...
Author: cheolsoo
Date: Mon Apr 28 21:59:43 2014
New Revision: 1590819
URL: http://svn.apache.org/r1590819
Log:
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
Added:
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 28 21:59:43 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
+
PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)
Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590819&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 21:59:43 2014
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+
+public class TaskContext<T> {
+ private T context;
+
+ public TaskContext(T context) {
+ this.context = context;
+ }
+
+ public T get() {
+ return context;
+ }
+
+ public Counter getCounter(Enum<?> name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(name);
+ }
+ return counter;
+ }
+
+ public Counter getCounter(String group, String name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(group, name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(group, name);
+ }
+ return counter;
+ }
+
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public void progress() {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.progress();
+ }
+ }
+
+ public float getProgress() {
+ return 0f;
+ }
+
+ public void setStatus(String status) {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.setStatus(status);
+ }
+ }
+}
Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590819&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 21:59:43 2014
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TaskContext<T> {
+ private T context;
+
+ public TaskContext(T context) {
+ this.context = context;
+ }
+
+ public T get() {
+ return context;
+ }
+
+ public Counter getCounter(Enum<?> name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(name);
+ }
+ return counter;
+ }
+
+ public Counter getCounter(String group, String name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(group, name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(group, name);
+ }
+ return counter;
+ }
+
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof TezProcessorContext) {
+ TezProcessorContext tezContext = (TezProcessorContext) context;
+ TezCounter counter = tezContext.getCounters().findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof TezProcessorContext) {
+ TezProcessorContext tezContext = (TezProcessorContext) context;
+ TezCounter counter = tezContext.getCounters().getGroup(group).findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public void progress() {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.progress();
+ }
+ }
+
+ public float getProgress() {
+ if (context == null) {
+ return 0f;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ return taskContext.getProgress();
+ }
+ return 0f;
+ }
+
+ public void setStatus(String status) {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.setStatus(status);
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java Mon Apr 28 21:59:43 2014
@@ -18,7 +18,6 @@
package org.apache.pig;
import com.google.common.collect.Maps;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -161,9 +160,6 @@ public abstract class TypedOutputEvalFun
}
protected static void safeIncrCounter(String group, String name, Long increment) {
- Counter counter = PigStatusReporter.getInstance().getCounter(group, name);
- if (counter != null) {
- counter.increment(increment);
- }
+ PigStatusReporter.getInstance().incrCounter(group, name, increment);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Apr 28 21:59:43 2014
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
@@ -116,7 +117,6 @@ public class FetchLauncher {
}
private void init(PhysicalPlan pp, POStore poStore) throws IOException {
-
poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
poStore.setUp();
@@ -147,10 +147,11 @@ public class FetchLauncher {
}
boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.getInstance().setFetchContext(new FetchContext());
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Mon Apr 28 21:59:43 2014
@@ -29,6 +29,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -36,48 +37,45 @@ import org.apache.pig.tools.pigstats.Pig
* collector/record writer. It sets up a modified job configuration to
* force a write to a specific subdirectory of the main output
* directory. This is done so that multiple output directories can be
- * used in the same job.
+ * used in the same job.
*/
-@SuppressWarnings("unchecked")
public class MapReducePOStoreImpl extends POStoreImpl {
-
+
private TaskAttemptContext context;
-
private PigStatusReporter reporter;
+ private RecordWriter<?,?> writer;
- private RecordWriter writer;
-
- public MapReducePOStoreImpl(TaskInputOutputContext context) {
+ public MapReducePOStoreImpl(TaskInputOutputContext<?,?,?,?> context) {
// get a copy of the Configuration so that changes to the
// configuration below (like setting the output location) do
// not affect the caller's copy
Configuration outputConf = new Configuration(context.getConfiguration());
- PigStatusReporter.setContext(context);
reporter = PigStatusReporter.getInstance();
-
+ reporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
// make a copy of the Context to use here - since in the same
// task (map or reduce) we could have multiple stores, we should
// make this copy so that the same context does not get over-written
// by the different stores.
-
- this.context = HadoopShims.createTaskAttemptContext(outputConf,
+
+ this.context = HadoopShims.createTaskAttemptContext(outputConf,
context.getTaskAttemptID());
}
-
+
@Override
- public StoreFuncInterface createStoreFunc(POStore store)
+ public StoreFuncInterface createStoreFunc(POStore store)
throws IOException {
-
+
StoreFuncInterface storeFunc = store.getStoreFunc();
// call the setStoreLocation on the storeFunc giving it the
// Job. Typically this will result in the OutputFormat of the
// storeFunc storing the output location in the Configuration
- // in the Job. The PigOutFormat.setLocation() method will merge
+ // in the Job. The PigOutFormat.setLocation() method will merge
// this modified Configuration into the configuration of the
// Context we have
PigOutputFormat.setLocation(context, store);
- OutputFormat outputFormat = storeFunc.getOutputFormat();
+ OutputFormat<?,?> outputFormat = storeFunc.getOutputFormat();
// create a new record writer
try {
@@ -85,9 +83,9 @@ public class MapReducePOStoreImpl extend
} catch (InterruptedException e) {
throw new IOException(e);
}
-
+
storeFunc.prepareToWrite(writer);
-
+
return storeFunc;
}
@@ -114,10 +112,10 @@ public class MapReducePOStoreImpl extend
writer = null;
}
}
-
+
public Counter createRecordCounter(POStore store) {
String name = MRPigStatsUtil.getMultiStoreCounterName(store);
return (name == null) ? null : reporter.getCounter(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Mon Apr 28 21:59:43 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
@@ -139,12 +141,11 @@ public class PigCombiner {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Mon Apr 28 21:59:43 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -41,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.SchemaTupleBackend;
@@ -64,9 +66,9 @@ import org.apache.pig.tools.pigstats.Pig
public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
private final Log log = LogFactory.getLog(getClass());
-
+
protected byte keyType;
-
+
//Map Plan
protected PhysicalPlan mp = null;
@@ -74,24 +76,24 @@ public abstract class PigGenericMapBase
protected List<POStore> stores;
protected TupleFactory tf = TupleFactory.getInstance();
-
+
boolean inIllustrator = false;
-
+
Context outputCollector;
-
+
// Reporter that will be used by operators
// to transmit heartbeat
ProgressableReporter pigReporter;
protected boolean errorInMap = false;
-
+
PhysicalOperator[] roots;
private PhysicalOperator leaf;
PigContext pigContext = null;
private volatile boolean initialized = false;
-
+
/**
* for local map/reduce simulation
* @param plan the map plan
@@ -99,7 +101,7 @@ public abstract class PigGenericMapBase
public void setMapPlan(PhysicalPlan plan) {
mp = plan;
}
-
+
/**
* Will be called when all the tuples in the input
* are done. So reporter thread should be closed.
@@ -111,9 +113,9 @@ public abstract class PigGenericMapBase
//error in map - returning
return;
}
-
+
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
- // If there is a stream in the pipeline or if this map job belongs to merge-join we could
+ // If there is a stream in the pipeline or if this map job belongs to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
// already and then lets run the pipeline one more time
@@ -126,7 +128,7 @@ public abstract class PigGenericMapBase
if (!inIllustrator) {
for (POStore store: stores) {
if (!initialized) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
@@ -134,7 +136,7 @@ public abstract class PigGenericMapBase
store.tearDown();
}
}
-
+
//Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
try {
@@ -144,7 +146,7 @@ public abstract class PigGenericMapBase
String msg = "Error while calling finish method on UDFs.";
throw new VisitorException(msg, errCode, PigException.BUG, e);
}
-
+
mp = null;
PhysicalOperator.setReporter(null);
@@ -159,14 +161,14 @@ public abstract class PigGenericMapBase
@Override
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
-
+
Configuration job = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
PigMapReduce.sJobConf = context.getConfiguration();
inIllustrator = inIllustrator(context);
-
+
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
@@ -175,12 +177,12 @@ public abstract class PigGenericMapBase
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
-
+
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
job.get("pig.mapPlan"));
stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
-
+
// To be removed
if(mp.isEmpty())
log.debug("Map Plan empty!");
@@ -191,7 +193,7 @@ public abstract class PigGenericMapBase
}
keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
// till here
-
+
pigReporter = new ProgressableReporter();
// Get the UDF specific context
MapRedUtil.setupUDFContext(job);
@@ -200,26 +202,27 @@ public abstract class PigGenericMapBase
PigSplit split = (PigSplit)context.getInputSplit();
List<OperatorKey> targetOpKeys = split.getTargetOps();
-
+
ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
- for (OperatorKey targetKey : targetOpKeys) {
+ for (OperatorKey targetKey : targetOpKeys) {
targetOpsAsList.add(mp.getOperator(targetKey));
}
roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
- leaf = mp.getLeaves().get(0);
+ leaf = mp.getLeaves().get(0);
}
-
- PigStatusReporter.setContext(context);
-
+
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
-
+
String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
}
-
+
/**
* The map function that attaches the inpTuple appropriately
* and executes the map plan if its not empty. Collects the
@@ -228,9 +231,9 @@ public abstract class PigGenericMapBase
* map-only or map-reduce job to implement. Map-only collects
* the tuple as-is whereas map-reduce collects it after extracting
* the key and indexed tuple.
- */
+ */
@Override
- protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
+ protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
if(!initialized) {
initialized = true;
// cache the collector for use in runPipeline() which
@@ -238,32 +241,31 @@ public abstract class PigGenericMapBase
this.outputCollector = context;
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
-
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+
if (!inIllustrator) {
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
if (!pigContext.inIllustrator)
store.setUp();
}
}
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
- PhysicalOperator.setPigLogger(pigHadoopLogger);
-
}
-
+
if (mp.isEmpty()) {
collect(context,inpTuple);
return;
}
-
+
for (PhysicalOperator root : roots) {
if (inIllustrator) {
if (root != null) {
@@ -273,7 +275,7 @@ public abstract class PigGenericMapBase
root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
}
}
-
+
runPipeline(leaf);
}
@@ -284,16 +286,16 @@ public abstract class PigGenericMapBase
collect(outputCollector,(Tuple)res.result);
continue;
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return;
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL)
continue;
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInMap = true;
// if there is an errmessage use it
@@ -305,19 +307,19 @@ public abstract class PigGenericMapBase
errMsg = "Received Error while " +
"processing the map plan.";
}
-
+
int errCode = 2055;
ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
throw ee;
}
}
-
+
}
abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
abstract public boolean inIllustrator(Context context);
-
+
/**
* @return the keyType
*/
@@ -331,7 +333,7 @@ public abstract class PigGenericMapBase
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
-
+
abstract public Context getIllustratorContext(Configuration conf, DataBag input,
List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
throws IOException, InterruptedException;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Mon Apr 28 21:59:43 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
@@ -63,20 +65,20 @@ import org.joda.time.DateTimeZone;
/**
* This class is the static Mapper & Reducer classes that
* are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a
+ * there is a reduce phase, the leaf is bound to be a
* POLocalRearrange. So the map phase has to separate the
* key and tuple and collect it into the output
* collector.
- *
+ *
* The shuffle and sort phase sorts these keys & tuples
* and creates key, List<Tuple> and passes the key and
* iterator to the list. The deserialized POPackage operator
- * is used to package the key, List<Tuple> into pigKey,
+ * is used to package the key, List<Tuple> into pigKey,
* Bag<Tuple> where pigKey is of the appropriate pig type and
* then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result
+ * plan which is executed if its not empty. Either the result
* of the reduce plan or the package res is collected into
- * the output collector.
+ * the output collector.
*
* The index of the tuple (that is, which bag it should be placed in by the
* package) is packed into the key. This is done so that hadoop sorts the
@@ -89,28 +91,28 @@ import org.joda.time.DateTimeZone;
public class PigGenericMapReduce {
public static JobContext sJobContext = null;
-
+
/**
- * @deprecated Use {@link UDFContext} instead in the following way to get
+ * @deprecated Use {@link UDFContext} instead in the following way to get
* the job's {@link Configuration}:
* <pre>UdfContext.getUdfContext().getJobConf()</pre>
*/
@Deprecated
public static Configuration sJobConf = null;
-
+
public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
-
+
public static class Map extends PigMapBase {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
HDataType.getWritableComparableTypes(tuple.get(1), keyType);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
@@ -121,7 +123,7 @@ public class PigGenericMapReduce {
oc.write(key, val);
}
}
-
+
/**
* This "specialized" map class is ONLY to be used in pig queries with
* order by a udf. A UDF used for comparison in the order by expects
@@ -131,9 +133,9 @@ public class PigGenericMapReduce {
public static class MapWithComparator extends PigMapBase {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Object keyTuple = null;
if(keyType != DataType.TUPLE) {
Object k = tuple.get(1);
@@ -141,13 +143,13 @@ public class PigGenericMapReduce {
} else {
keyTuple = tuple.get(1);
}
-
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
@@ -165,9 +167,9 @@ public class PigGenericMapReduce {
public static class MapWithPartitionIndex extends Map {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Byte tupleKeyIdx = 2;
Byte tupleValIdx = 3;
@@ -189,13 +191,13 @@ public class PigGenericMapReduce {
NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
wrappedKey.setIndex(index);
-
+
// set the partition
wrappedKey.setPartition(partitionIndex);
val.setIndex(index);
@@ -203,14 +205,14 @@ public class PigGenericMapReduce {
}
@Override
- protected void runPipeline(PhysicalOperator leaf)
+ protected void runPipeline(PhysicalOperator leaf)
throws IOException, InterruptedException {
-
+
while(true){
Result res = leaf.getNextTuple();
-
+
if(res.returnStatus==POStatus.STATUS_OK){
- // For POPartitionRearrange, the result is a bag.
+ // For POPartitionRearrange, the result is a bag.
// This operator is used for skewed join
if (res.result instanceof DataBag) {
Iterator<Tuple> its = ((DataBag)res.result).iterator();
@@ -222,7 +224,7 @@ public class PigGenericMapReduce {
}
continue;
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return;
}
@@ -232,7 +234,7 @@ public class PigGenericMapReduce {
}
if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInMap = true;
// if there is an errmessage use it
@@ -252,39 +254,39 @@ public class PigGenericMapReduce {
}
}
- abstract public static class Reduce
+ abstract public static class Reduce
extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
+
protected final Log log = LogFactory.getLog(getClass());
-
+
//The reduce plan
protected PhysicalPlan rp = null;
// Store operators
protected List<POStore> stores;
-
+
//The POPackage operator which is the
//root of every Map Reduce plan is
//obtained through the job conf. The portion
//remaining after its removal is the reduce
//plan
protected POPackage pack;
-
+
ProgressableReporter pigReporter;
protected Context outputCollector;
protected boolean errorInReduce = false;
-
+
PhysicalOperator[] roots;
private PhysicalOperator leaf;
-
+
PigContext pigContext = null;
protected volatile boolean initialized = false;
-
+
private boolean inIllustrator = false;
-
+
/**
* Set the reduce plan: to be used by local runner for illustrator
* @param plan Reduce plan
@@ -312,7 +314,7 @@ public class PigGenericMapReduce {
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-
+
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
SchemaTupleBackend.initialize(jConf, pigContext);
@@ -336,36 +338,37 @@ public class PigGenericMapReduce {
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
leaf = rp.getLeaves().get(0);
}
-
+
// Get the UDF specific context
MapRedUtil.setupUDFContext(jConf);
-
+
} catch (IOException ioe) {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);
}
+
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
-
+
String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
}
-
+
/**
* The reduce function which packages the key and List<Tuple>
* into key, Bag<Tuple> after converting Hadoop type key into Pig type.
* The package result is either collected as is, if the reduce plan is
* empty or after passing through the reduce plan.
- */
+ */
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
- throws IOException, InterruptedException {
-
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
+
if (!initialized) {
initialized = true;
-
+
// cache the collector for use in runPipeline()
// which could additionally be called from close()
this.outputCollector = context;
@@ -373,27 +376,26 @@ public class PigGenericMapReduce {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
if (!inIllustrator)
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
}
-
+
// In the case we optimize the join, we combine
// POPackage and POForeach - so we could get many
// tuples out of the getnext() call of POJoinPackage
- // In this case, we process till we see EOP from
+ // In this case, we process till we see EOP from
// POJoinPacakage.getNext()
if (pack.getPkgr() instanceof JoinPackager)
{
@@ -409,18 +411,18 @@ public class PigGenericMapReduce {
// give only one tuple out for the key
pack.attachInput(key, tupIter.iterator());
processOnePackageOutput(context);
- }
+ }
}
-
+
// return: false-more output
// true- end of processing
- public boolean processOnePackageOutput(Context oc)
+ public boolean processOnePackageOutput(Context oc)
throws IOException, InterruptedException {
Result res = pack.getNextTuple();
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
-
+
if(rp.isEmpty()){
oc.write(null, packRes);
return false;
@@ -429,35 +431,35 @@ public class PigGenericMapReduce {
roots[i].attachInput(packRes);
}
runPipeline(leaf);
-
+
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL) {
return false;
}
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
int errCode = 2093;
String msg = "Encountered error in package operator while processing group.";
throw new ExecException(msg, errCode, PigException.BUG);
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return true;
}
-
+
return false;
-
+
}
-
+
/**
* @param leaf
* @throws InterruptedException
- * @throws IOException
+ * @throws IOException
*/
- protected void runPipeline(PhysicalOperator leaf)
+ protected void runPipeline(PhysicalOperator leaf)
throws InterruptedException, IOException {
-
+
while(true)
{
Result redRes = leaf.getNextTuple();
@@ -469,17 +471,17 @@ public class PigGenericMapReduce {
}
continue;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_EOP) {
return;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_NULL) {
continue;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInReduce = true;
// if there is an errmessage use it
@@ -496,22 +498,22 @@ public class PigGenericMapReduce {
}
}
}
-
+
/**
* Will be called once all the intermediate keys and values are
* processed. So right place to stop the reporter thread.
*/
- @Override
+ @Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
-
+
if(errorInReduce) {
// there was an error in reduce - just return
return;
}
-
+
if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
- // If there is a stream in the pipeline we could
+ // If there is a stream in the pipeline we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
// already and then lets run the pipeline one more time
@@ -524,7 +526,7 @@ public class PigGenericMapReduce {
if (!inIllustrator) {
for (POStore store: stores) {
if (!initialized) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
@@ -532,7 +534,7 @@ public class PigGenericMapReduce {
store.tearDown();
}
}
-
+
//Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
try {
@@ -540,14 +542,14 @@ public class PigGenericMapReduce {
} catch (VisitorException e) {
throw new IOException("Error trying to finish UDFs",e);
}
-
+
PhysicalOperator.setReporter(null);
initialized = false;
}
-
+
/**
* Get reducer's illustrator context
- *
+ *
* @param input Input buffer as output by maps
* @param pkg package
* @return reducer's illustrator context
@@ -556,25 +558,25 @@ public class PigGenericMapReduce {
*/
abstract public Context getIllustratorContext(Job job,
List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
-
+
abstract public boolean inIllustrator(Context context);
-
+
abstract public POPackage getPack(Context context);
}
-
+
/**
* This "specialized" reduce class is ONLY to be used in pig queries with
* order by a udf. A UDF used for comparison in the order by expects
* to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
- * ensures that the "key" used in the order by is wrapped into a tuple (if it
+ * ensures that the "key" used in the order by is wrapped into a tuple (if it
* isn't already a tuple). This reduce class unwraps this tuple in the case where
* the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
* for processing
*/
public static class ReduceWithComparator extends PigMapReduce.Reduce {
-
+
private byte keyType;
-
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
@@ -592,12 +594,12 @@ public class PigGenericMapReduce {
* empty or after passing through the reduce plan.
*/
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
-
+
if (!initialized) {
initialized = true;
-
+
// cache the collector for use in runPipeline()
// which could additionally be called from close()
this.outputCollector = context;
@@ -605,24 +607,23 @@ public class PigGenericMapReduce {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
-
+
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
}
-
+
// If the keyType is not a tuple, the MapWithComparator.collect()
- // would have wrapped the key into a tuple so that the
+ // would have wrapped the key into a tuple so that the
// comparison UDF used in the order by can process it.
// We need to unwrap the key out of the tuple and hand it
// to the POPackage for processing
@@ -634,31 +635,31 @@ public class PigGenericMapReduce {
throw e;
}
}
-
+
pack.attachInput(key, tupIter.iterator());
-
+
Result res = pack.getNextTuple();
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
-
+
if(rp.isEmpty()){
context.write(null, packRes);
return;
}
-
+
rp.attachInput(packRes);
List<PhysicalOperator> leaves = rp.getLeaves();
-
+
PhysicalOperator leaf = leaves.get(0);
runPipeline(leaf);
-
+
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL) {
return;
}
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
int errCode = 2093;
String msg = "Encountered error in package operator while processing group.";
@@ -668,5 +669,5 @@ public class PigGenericMapReduce {
}
}
-
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon Apr 28 21:59:43 2014
@@ -35,25 +35,38 @@ import org.apache.pig.tools.pigstats.Pig
* warning messages
*/
public final class PigHadoopLogger implements PigLogger {
- private static class PigHadoopLoggerHelper {
- private static PigHadoopLogger instance = new PigHadoopLogger();
- }
-
- public static PigHadoopLogger getInstance() {
- return PigHadoopLoggerHelper.instance;
- }
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
+ private static PigHadoopLogger logger = null;
private PigStatusReporter reporter = null;
-
private boolean aggregate = false;
-
private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
private PigHadoopLogger() {
}
+ /**
+ * Get singleton instance of the context
+ */
+ public static PigHadoopLogger getInstance() {
+ if (logger == null) {
+ logger = new PigHadoopLogger();
+ }
+ return logger;
+ }
+
+ public void destory() {
+ if (reporter != null) {
+ reporter.destory();
+ }
+ reporter = null;
+ }
+
+ public void setReporter(PigStatusReporter reporter) {
+ this.reporter = reporter;
+ }
+
@SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
String className = o.getClass().getName();
@@ -61,15 +74,15 @@ public final class PigHadoopLogger imple
if (getAggregate()) {
if (reporter != null) {
- // log atleast once
+ // log at least once
if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
log.warn(displayMessage);
msgMap.put(o, displayMessage);
}
if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
- reporter.getCounter(className, warningEnum.name()).increment(1);
+ reporter.incrCounter(className, warningEnum.name(), 1);
} else {
- reporter.getCounter(warningEnum).increment(1);
+ reporter.incrCounter(warningEnum, 1);
}
} else {
//TODO:
@@ -87,10 +100,6 @@ public final class PigHadoopLogger imple
}
}
- public synchronized void setReporter(PigStatusReporter rep) {
- this.reporter = rep;
- }
-
public synchronized boolean getAggregate() {
return aggregate;
}
@@ -98,5 +107,4 @@ public final class PigHadoopLogger imple
public synchronized void setAggregate(boolean aggregate) {
this.aggregate = aggregate;
}
-
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Mon Apr 28 21:59:43 2014
@@ -75,10 +75,9 @@ public class PigMapReduceCounter {
try {
PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null) {
- reporter.getCounter(
+ reporter.incrCounter(
JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID)
- .increment(1);
+ + context.getJobID().toString(), taskID, 1);
}
} catch (Exception ex) {
log.error("Error on incrementer of PigMapCounter");
@@ -135,10 +134,9 @@ public class PigMapReduceCounter {
if (reporter != null) {
if(leaf instanceof POCounter){
- reporter.getCounter(
+ reporter.incrCounter(
JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID).increment(increment);
-
+ + context.getJobID().toString(), taskID, increment);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Mon Apr 28 21:59:43 2014
@@ -56,7 +56,7 @@ public class PigRecordReader extends Rec
private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
private final static String TIMING_COUNTER = "approx_microsecs";
- private final static int TIMING_FREQ = 100;
+ private final static long TIMING_FREQ = 100;
transient private String counterGroup = "";
private boolean doTiming = false;
@@ -214,8 +214,8 @@ public class PigRecordReader extends Rec
}
}
if (timeThis) {
- PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
- ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+ Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
}
recordCount++;
return true;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Mon Apr 28 21:59:43 2014
@@ -27,9 +27,8 @@ import org.apache.pig.classification.Int
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface PigLogger {
-
- /**
- * If you have warning messages that need aggregation
- */
- public void warn(Object o, String msg, Enum warningEnum);
+ /**
+ * If you have warning messages that need aggregation
+ */
+ public void warn(Object o, String msg, Enum<?> warningEnum);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Apr 28 21:59:43 2014
@@ -61,7 +61,7 @@ public class POUserFunc extends Expressi
private static final Log LOG = LogFactory.getLog(POUserFunc.class);
private final static String TIMING_COUNTER = "approx_microsecs";
private final static String INVOCATION_COUNTER = "approx_invocations";
- private final static int TIMING_FREQ = 100;
+ private final static long TIMING_FREQ = 100;
private final static TupleFactory tf = TupleFactory.getInstance();
private transient String counterGroup;
@@ -229,8 +229,8 @@ public class POUserFunc extends Expressi
if (knownSize) {
rslt.set(knownIndex++, trslt.get(i));
} else {
- rslt.append(trslt.get(i));
- }
+ rslt.append(trslt.get(i));
+ }
}
continue;
}
@@ -238,8 +238,8 @@ public class POUserFunc extends Expressi
if (knownSize) {
((Tuple)res.result).set(knownIndex++, temp.result);
} else {
- ((Tuple)res.result).append(temp.result);
- }
+ ((Tuple)res.result).append(temp.result);
+ }
}
res.returnStatus = temp.returnStatus;
@@ -273,8 +273,7 @@ public class POUserFunc extends Expressi
boolean timeThis = doTiming && (numInvocations++ % TIMING_FREQ == 0);
if (timeThis) {
startNanos = System.nanoTime();
- PigStatusReporter.getInstance().getCounter(counterGroup, INVOCATION_COUNTER).increment(TIMING_FREQ);
-
+ PigStatusReporter.getInstance().incrCounter(counterGroup, INVOCATION_COUNTER, TIMING_FREQ);
}
try {
if(result.returnStatus == POStatus.STATUS_OK) {
@@ -355,8 +354,8 @@ public class POUserFunc extends Expressi
}
}
if (timeThis) {
- PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
- ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+ Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
}
return result;
} catch (ExecException ee) {
@@ -404,13 +403,11 @@ public class POUserFunc extends Expressi
@Override
public Result getNextBoolean() throws ExecException {
-
return getNext();
}
@Override
public Result getNextDataByteArray() throws ExecException {
-
return getNext();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java Mon Apr 28 21:59:43 2014
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledThr
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.Tuple;
@@ -136,20 +135,18 @@ public class MonitoredUDFExecutor implem
@SuppressWarnings("unchecked")
public static void handleError(EvalFunc evalFunc, Exception e) {
evalFunc.getLogger().error(e);
- StatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null &&
- reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null) {
- reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(evalFunc.getClass().getName(), e.toString(), 1);
}
}
@SuppressWarnings("unchecked")
public static void handleTimeout(EvalFunc evalFunc, Exception e) {
evalFunc.getLogger().error(e);
- StatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null &&
- reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout") != null) {
- reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout", 1);
}
}
}
Modified: pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java (original)
+++ pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java Mon Apr 28 21:59:43 2014
@@ -20,7 +20,6 @@ package org.apache.pig.tools.counters;
import java.util.Map;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -33,53 +32,51 @@ import com.google.common.collect.Maps;
* stored counters each time it does.
*/
public class PigCounterHelper {
- private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
- private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
+ private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
+ private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
- /**
- * Mocks the Reporter.incrCounter, but adds buffering.
- * See org.apache.hadoop.mapred.Reporter's incrCounter.
- */
- public void incrCounter(String group, String counterName, long incr) {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null) { // common case
- Counter counter = reporter.getCounter(group, counterName);
- if (counter != null) {
- counter.increment(incr);
-
- if (counterStringMap_.size() > 0) {
- for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
- reporter.getCounter(entry.getKey().first, entry.getKey().second).increment(entry.getValue());
- }
- counterStringMap_.clear();
+ /**
+ * Mocks the Reporter.incrCounter, but adds buffering.
+ * See org.apache.hadoop.mapred.Reporter's incrCounter.
+ */
+ public void incrCounter(String group, String counterName, long incr) {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null && reporter.incrCounter(group, counterName, incr)) { // common case
+ if (counterStringMap_.size() > 0) {
+ for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
+ reporter.incrCounter(entry.getKey().first, entry.getKey().second, entry.getValue());
+ }
+ counterStringMap_.clear();
+ }
+ return;
}
- return;
- }
+
+ // In the case when reporter is not available, or we can't get the
+ // Counter, store in the local map.
+ Pair<String, String> key = new Pair<String, String>(group, counterName);
+ Long currentValue = counterStringMap_.get(key);
+ counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
}
- // In the case when reporter is not available, or we can't get the Counter,
- // store in the local map.
- Pair<String, String> key = new Pair<String, String>(group, counterName);
- Long currentValue = counterStringMap_.get(key);
- counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
- }
-
- /**
- * Mocks the Reporter.incrCounter, but adds buffering.
- * See org.apache.hadoop.mapred.Reporter's incrCounter.
- */
- public void incrCounter(Enum<?> key, long incr) {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null && reporter.getCounter(key) != null) {
- reporter.getCounter(key).increment(incr);
- if (counterEnumMap_.size() > 0) {
- for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
- reporter.getCounter(entry.getKey()).increment(entry.getValue());
+
+ /**
+ * Mocks the Reporter.incrCounter, but adds buffering.
+ * See org.apache.hadoop.mapred.Reporter's incrCounter.
+ */
+ public void incrCounter(Enum<?> key, long incr) {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null && reporter.incrCounter(key, incr)) { // common case
+ if (counterEnumMap_.size() > 0) {
+ for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
+ reporter.getCounter(entry.getKey()).increment(entry.getValue());
+ }
+ counterEnumMap_.clear();
+ }
+ return;
}
- counterEnumMap_.clear();
- }
- } else { // buffer the increments
- Long currentValue = counterEnumMap_.get(key);
- counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
+
+ // In the case when reporter is not available, or we can't get the
+ // Counter, store in the local map.
+ Long currentValue = counterEnumMap_.get(key);
+ counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
}
- }
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Mon Apr 28 21:59:43 2014
@@ -20,80 +20,85 @@ package org.apache.pig.tools.pigstats;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.Progressable;
-import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-@SuppressWarnings("unchecked")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PigStatusReporter extends StatusReporter implements Progressable {
- private TaskInputOutputContext context;
- private FetchContext fetchContext;
-
private static PigStatusReporter reporter = null;
+ private TaskContext<?> context = null;
+
+ private PigStatusReporter() {
+ }
+
/**
* Get singleton instance of the context
*/
public static PigStatusReporter getInstance() {
if (reporter == null) {
- reporter = new PigStatusReporter(null);
+ reporter = new PigStatusReporter();
}
return reporter;
}
- public static void setContext(TaskInputOutputContext context) {
- reporter = new PigStatusReporter(context);
+ public void destory() {
+ context = null;
}
- private PigStatusReporter(TaskInputOutputContext context) {
+ public void setContext(TaskContext<?> context) {
this.context = context;
}
+ /**
+ * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+ * This method returns MR counter which is not compatible with Tez mode. Use
+ * incrCounter() that is compatible with both MR and Tez mode.
+ */
@Override
+ @Deprecated
public Counter getCounter(Enum<?> name) {
- if (fetchContext != null) {
- return fetchContext.getCounter(name);
- }
- return (context == null) ? null : context.getCounter(name);
+ return (context == null) ? null : context.getCounter(name);
}
+ /**
+ * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+ * This method returns MR counter which is not compatible with Tez mode. Use
+ * incrCounter() that is compatible with both MR and Tez mode.
+ */
@Override
+ @Deprecated
public Counter getCounter(String group, String name) {
- if (fetchContext != null) {
- return fetchContext.getCounter(group, name);
- }
- return (context == null) ? null : context.getCounter(group, name);
+ return context == null ? null : context.getCounter(group, name);
+ }
+
+ public boolean incrCounter(Enum<?> name, long incr) {
+ return context == null ? false : context.incrCounter(name, incr);
+ }
+
+ public boolean incrCounter(String group, String name, long incr) {
+ return context == null ? false : context.incrCounter(group, name, incr);
}
@Override
public void progress() {
- if (fetchContext == null && context != null) {
+ if (context != null) {
context.progress();
}
}
@Override
public void setStatus(String status) {
- if (fetchContext == null && context != null) {
+ if (context != null) {
context.setStatus(status);
}
}
public float getProgress() {
- return 0;
+ return context == null ? 0f : context.getProgress();
}
-
- /**
- * Sets a dummy counter handler for fetch tasks
- * @param fetchContext
- */
- public void setFetchContext(FetchContext fetchContext) {
- this.fetchContext = fetchContext;
- }
-
}