You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/29 00:02:46 UTC
svn commit: r1590822 - in /pig/branches/tez:
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/execution...
Author: cheolsoo
Date: Mon Apr 28 22:02:45 2014
New Revision: 1590822
URL: http://svn.apache.org/r1590822
Log:
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
Added:
pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
Removed:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java
Modified:
pig/branches/tez/src/org/apache/pig/TypedOutputEvalFunc.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/tools/counters/PigCounterHelper.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Added: pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590822&view=auto
==============================================================================
--- pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 22:02:45 2014
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+
+public class TaskContext<T> {
+ private T context;
+
+ public TaskContext(T context) {
+ this.context = context;
+ }
+
+ public T get() {
+ return context;
+ }
+
+ public Counter getCounter(Enum<?> name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(name);
+ }
+ return counter;
+ }
+
+ public Counter getCounter(String group, String name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(group, name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(group, name);
+ }
+ return counter;
+ }
+
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public void progress() {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.progress();
+ }
+ }
+
+ public float getProgress() {
+ return 0f;
+ }
+
+ public void setStatus(String status) {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.setStatus(status);
+ }
+ }
+}
Added: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590822&view=auto
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 22:02:45 2014
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TaskContext<T> {
+ private T context;
+
+ public TaskContext(T context) {
+ this.context = context;
+ }
+
+ public T get() {
+ return context;
+ }
+
+ public Counter getCounter(Enum<?> name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(name);
+ }
+ return counter;
+ }
+
+ public Counter getCounter(String group, String name) {
+ Counter counter = null;
+ if (context == null) {
+ return counter;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ counter = taskContext.getCounter(group, name);
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ counter = fetchContext.getCounter(group, name);
+ }
+ return counter;
+ }
+
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof TezProcessorContext) {
+ TezProcessorContext tezContext = (TezProcessorContext) context;
+ TezCounter counter = tezContext.getCounters().findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ Counter counter = taskContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof FetchContext) {
+ FetchContext fetchContext = (FetchContext) context;
+ Counter counter = fetchContext.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ } else if (context instanceof TezProcessorContext) {
+ TezProcessorContext tezContext = (TezProcessorContext) context;
+ TezCounter counter = tezContext.getCounters().getGroup(group).findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+ return false;
+ }
+
+ public void progress() {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.progress();
+ }
+ }
+
+ public float getProgress() {
+ if (context == null) {
+ return 0f;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ return taskContext.getProgress();
+ }
+ return 0f;
+ }
+
+ public void setStatus(String status) {
+ if (context == null) {
+ return;
+ }
+ if (context instanceof TaskInputOutputContext) {
+ TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+ taskContext.setStatus(status);
+ }
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/TypedOutputEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/TypedOutputEvalFunc.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/TypedOutputEvalFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/TypedOutputEvalFunc.java Mon Apr 28 22:02:45 2014
@@ -18,7 +18,6 @@
package org.apache.pig;
import com.google.common.collect.Maps;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -161,9 +160,6 @@ public abstract class TypedOutputEvalFun
}
protected static void safeIncrCounter(String group, String name, Long increment) {
- Counter counter = PigStatusReporter.getInstance().getCounter(group, name);
- if (counter != null) {
- counter.increment(increment);
- }
+ PigStatusReporter.getInstance().incrCounter(group, name, increment);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Apr 28 22:02:45 2014
@@ -35,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -109,7 +110,6 @@ public class FetchLauncher {
}
private void init(PhysicalPlan pp, POStore poStore) throws IOException {
-
poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
poStore.setUp();
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
@@ -130,12 +130,13 @@ public class FetchLauncher {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
-
+
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.getInstance().setFetchContext(new FetchContext());
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Mon Apr 28 22:02:45 2014
@@ -29,6 +29,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -36,48 +37,45 @@ import org.apache.pig.tools.pigstats.Pig
* collector/record writer. It sets up a modified job configuration to
* force a write to a specific subdirectory of the main output
* directory. This is done so that multiple output directories can be
- * used in the same job.
+ * used in the same job.
*/
-@SuppressWarnings("unchecked")
public class MapReducePOStoreImpl extends POStoreImpl {
-
+
private TaskAttemptContext context;
-
private PigStatusReporter reporter;
+ private RecordWriter<?,?> writer;
- private RecordWriter writer;
-
- public MapReducePOStoreImpl(TaskInputOutputContext context) {
+ public MapReducePOStoreImpl(TaskInputOutputContext<?,?,?,?> context) {
// get a copy of the Configuration so that changes to the
// configuration below (like setting the output location) do
// not affect the caller's copy
Configuration outputConf = new Configuration(context.getConfiguration());
- PigStatusReporter.setContext(context);
reporter = PigStatusReporter.getInstance();
-
+ reporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
// make a copy of the Context to use here - since in the same
// task (map or reduce) we could have multiple stores, we should
// make this copy so that the same context does not get over-written
// by the different stores.
-
- this.context = HadoopShims.createTaskAttemptContext(outputConf,
+
+ this.context = HadoopShims.createTaskAttemptContext(outputConf,
context.getTaskAttemptID());
}
-
+
@Override
- public StoreFuncInterface createStoreFunc(POStore store)
+ public StoreFuncInterface createStoreFunc(POStore store)
throws IOException {
-
+
StoreFuncInterface storeFunc = store.getStoreFunc();
// call the setStoreLocation on the storeFunc giving it the
// Job. Typically this will result in the OutputFormat of the
// storeFunc storing the output location in the Configuration
- // in the Job. The PigOutFormat.setLocation() method will merge
+ // in the Job. The PigOutFormat.setLocation() method will merge
// this modified Configuration into the configuration of the
// Context we have
PigOutputFormat.setLocation(context, store);
- OutputFormat outputFormat = storeFunc.getOutputFormat();
+ OutputFormat<?,?> outputFormat = storeFunc.getOutputFormat();
// create a new record writer
try {
@@ -85,9 +83,9 @@ public class MapReducePOStoreImpl extend
} catch (InterruptedException e) {
throw new IOException(e);
}
-
+
storeFunc.prepareToWrite(writer);
-
+
return storeFunc;
}
@@ -114,10 +112,10 @@ public class MapReducePOStoreImpl extend
writer = null;
}
}
-
+
public Counter createRecordCounter(POStore store) {
String name = MRPigStatsUtil.getMultiStoreCounterName(store);
return (name == null) ? null : reporter.getCounter(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Mon Apr 28 22:02:45 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -36,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
@@ -135,12 +137,11 @@ public class PigCombiner {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Mon Apr 28 22:02:45 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -41,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.SchemaTupleBackend;
@@ -64,9 +66,9 @@ import org.apache.pig.tools.pigstats.Pig
public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
private final Log log = LogFactory.getLog(getClass());
-
+
protected byte keyType;
-
+
//Map Plan
protected PhysicalPlan mp = null;
@@ -74,24 +76,24 @@ public abstract class PigGenericMapBase
protected List<POStore> stores;
protected TupleFactory tf = TupleFactory.getInstance();
-
+
boolean inIllustrator = false;
-
+
Context outputCollector;
-
+
// Reporter that will be used by operators
// to transmit heartbeat
ProgressableReporter pigReporter;
protected boolean errorInMap = false;
-
+
PhysicalOperator[] roots;
private PhysicalOperator leaf;
PigContext pigContext = null;
private volatile boolean initialized = false;
-
+
/**
* for local map/reduce simulation
* @param plan the map plan
@@ -99,7 +101,7 @@ public abstract class PigGenericMapBase
public void setMapPlan(PhysicalPlan plan) {
mp = plan;
}
-
+
/**
* Will be called when all the tuples in the input
* are done. So reporter thread should be closed.
@@ -111,9 +113,9 @@ public abstract class PigGenericMapBase
//error in map - returning
return;
}
-
+
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
- // If there is a stream in the pipeline or if this map job belongs to merge-join we could
+ // If there is a stream in the pipeline or if this map job belongs to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
// already and then lets run the pipeline one more time
@@ -126,7 +128,7 @@ public abstract class PigGenericMapBase
if (!inIllustrator) {
for (POStore store: stores) {
if (!initialized) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
@@ -134,7 +136,7 @@ public abstract class PigGenericMapBase
store.tearDown();
}
}
-
+
//Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
try {
@@ -144,7 +146,7 @@ public abstract class PigGenericMapBase
String msg = "Error while calling finish method on UDFs.";
throw new VisitorException(msg, errCode, PigException.BUG, e);
}
-
+
mp = null;
PhysicalOperator.setReporter(null);
@@ -159,14 +161,14 @@ public abstract class PigGenericMapBase
@Override
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
-
+
Configuration job = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
PigMapReduce.sJobConf = context.getConfiguration();
inIllustrator = inIllustrator(context);
-
+
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
@@ -175,12 +177,12 @@ public abstract class PigGenericMapBase
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
-
+
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
job.get("pig.mapPlan"));
stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
-
+
// To be removed
if(mp.isEmpty())
log.debug("Map Plan empty!");
@@ -191,7 +193,7 @@ public abstract class PigGenericMapBase
}
keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
// till here
-
+
pigReporter = new ProgressableReporter();
// Get the UDF specific context
MapRedUtil.setupUDFContext(job);
@@ -200,26 +202,27 @@ public abstract class PigGenericMapBase
PigSplit split = (PigSplit)context.getInputSplit();
List<OperatorKey> targetOpKeys = split.getTargetOps();
-
+
ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
- for (OperatorKey targetKey : targetOpKeys) {
+ for (OperatorKey targetKey : targetOpKeys) {
targetOpsAsList.add(mp.getOperator(targetKey));
}
roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
- leaf = mp.getLeaves().get(0);
+ leaf = mp.getLeaves().get(0);
}
-
- PigStatusReporter.setContext(context);
-
+
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
-
+
String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
}
-
+
/**
* The map function that attaches the inpTuple appropriately
* and executes the map plan if its not empty. Collects the
@@ -228,9 +231,9 @@ public abstract class PigGenericMapBase
* map-only or map-reduce job to implement. Map-only collects
* the tuple as-is whereas map-reduce collects it after extracting
* the key and indexed tuple.
- */
+ */
@Override
- protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
+ protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
if(!initialized) {
initialized = true;
// cache the collector for use in runPipeline() which
@@ -238,32 +241,31 @@ public abstract class PigGenericMapBase
this.outputCollector = context;
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
-
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+
if (!inIllustrator) {
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
if (!pigContext.inIllustrator)
store.setUp();
}
}
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
- PhysicalOperator.setPigLogger(pigHadoopLogger);
-
}
-
+
if (mp.isEmpty()) {
collect(context,inpTuple);
return;
}
-
+
for (PhysicalOperator root : roots) {
if (inIllustrator) {
if (root != null) {
@@ -273,7 +275,7 @@ public abstract class PigGenericMapBase
root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
}
}
-
+
runPipeline(leaf);
}
@@ -284,16 +286,16 @@ public abstract class PigGenericMapBase
collect(outputCollector,(Tuple)res.result);
continue;
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return;
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL)
continue;
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInMap = true;
// if there is an errmessage use it
@@ -305,19 +307,19 @@ public abstract class PigGenericMapBase
errMsg = "Received Error while " +
"processing the map plan.";
}
-
+
int errCode = 2055;
ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
throw ee;
}
}
-
+
}
abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
abstract public boolean inIllustrator(Context context);
-
+
/**
* @return the keyType
*/
@@ -331,7 +333,7 @@ public abstract class PigGenericMapBase
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
-
+
abstract public Context getIllustratorContext(Configuration conf, DataBag input,
List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
throws IOException, InterruptedException;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Mon Apr 28 22:02:45 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
@@ -344,6 +346,7 @@ public class PigGenericMapReduce {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);
}
+
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
@@ -373,12 +376,11 @@ public class PigGenericMapReduce {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
if (!inIllustrator)
@@ -605,12 +607,11 @@ public class PigGenericMapReduce {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
- PigStatusReporter.setContext(context);
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
PhysicalOperator.setPigLogger(pigHadoopLogger);
for (POStore store: stores) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon Apr 28 22:02:45 2014
@@ -35,25 +35,38 @@ import org.apache.pig.tools.pigstats.Pig
* warning messages
*/
public final class PigHadoopLogger implements PigLogger {
- private static class PigHadoopLoggerHelper {
- private static PigHadoopLogger instance = new PigHadoopLogger();
- }
-
- public static PigHadoopLogger getInstance() {
- return PigHadoopLoggerHelper.instance;
- }
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
+ private static PigHadoopLogger logger = null;
private PigStatusReporter reporter = null;
-
private boolean aggregate = false;
-
private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
private PigHadoopLogger() {
}
+ /**
+ * Get singleton instance of the context
+ */
+ public static PigHadoopLogger getInstance() {
+ if (logger == null) {
+ logger = new PigHadoopLogger();
+ }
+ return logger;
+ }
+
+ public void destory() {
+ if (reporter != null) {
+ reporter.destory();
+ }
+ reporter = null;
+ }
+
+ public void setReporter(PigStatusReporter reporter) {
+ this.reporter = reporter;
+ }
+
@SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
String className = o.getClass().getName();
@@ -61,15 +74,15 @@ public final class PigHadoopLogger imple
if (getAggregate()) {
if (reporter != null) {
- // log atleast once
+ // log at least once
if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
log.warn(displayMessage);
msgMap.put(o, displayMessage);
}
if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
- reporter.getCounter(className, warningEnum.name()).increment(1);
+ reporter.incrCounter(className, warningEnum.name(), 1);
} else {
- reporter.getCounter(warningEnum).increment(1);
+ reporter.incrCounter(warningEnum, 1);
}
} else {
//TODO:
@@ -87,10 +100,6 @@ public final class PigHadoopLogger imple
}
}
- public synchronized void setReporter(PigStatusReporter rep) {
- this.reporter = rep;
- }
-
public synchronized boolean getAggregate() {
return aggregate;
}
@@ -98,5 +107,4 @@ public final class PigHadoopLogger imple
public synchronized void setAggregate(boolean aggregate) {
this.aggregate = aggregate;
}
-
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Mon Apr 28 22:02:45 2014
@@ -72,10 +72,9 @@ public class PigMapReduceCounter {
try {
PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null) {
- reporter.getCounter(
+ reporter.incrCounter(
JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID)
- .increment(1);
+ + context.getJobID().toString(), taskID, 1);
}
} catch (Exception ex) {
log.error("Error on incrementer of PigMapCounter");
@@ -133,10 +132,9 @@ public class PigMapReduceCounter {
if (reporter != null) {
if(leaf instanceof POCounter){
- reporter.getCounter(
+ reporter.incrCounter(
JobControlCompiler.PIG_MAP_RANK_NAME
- + context.getJobID().toString(), taskID).increment(increment);
-
+ + context.getJobID().toString(), taskID, increment);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Mon Apr 28 22:02:45 2014
@@ -56,7 +56,7 @@ public class PigRecordReader extends Rec
private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
private final static String TIMING_COUNTER = "approx_microsecs";
- private final static int TIMING_FREQ = 100;
+ private final static long TIMING_FREQ = 100;
transient private String counterGroup = "";
private boolean doTiming = false;
@@ -214,8 +214,8 @@ public class PigRecordReader extends Rec
}
}
if (timeThis) {
- PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
- ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+ Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
}
recordCount++;
return true;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Mon Apr 28 22:02:45 2014
@@ -21,15 +21,14 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
/**
- *
+ *
* An interface to allow aggregation of messages
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface PigLogger {
-
- /**
- * If you have warning messages that need aggregation
- */
- public void warn(Object o, String msg, Enum warningEnum);
+ /**
+ * If you have warning messages that need aggregation
+ */
+ public void warn(Object o, String msg, Enum<?> warningEnum);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Apr 28 22:02:45 2014
@@ -61,7 +61,7 @@ public class POUserFunc extends Expressi
private static final Log LOG = LogFactory.getLog(POUserFunc.class);
private final static String TIMING_COUNTER = "approx_microsecs";
private final static String INVOCATION_COUNTER = "approx_invocations";
- private final static int TIMING_FREQ = 100;
+ private final static long TIMING_FREQ = 100;
private final static TupleFactory tf = TupleFactory.getInstance();
private transient String counterGroup;
@@ -229,8 +229,8 @@ public class POUserFunc extends Expressi
if (knownSize) {
rslt.set(knownIndex++, trslt.get(i));
} else {
- rslt.append(trslt.get(i));
- }
+ rslt.append(trslt.get(i));
+ }
}
continue;
}
@@ -238,8 +238,8 @@ public class POUserFunc extends Expressi
if (knownSize) {
((Tuple)res.result).set(knownIndex++, temp.result);
} else {
- ((Tuple)res.result).append(temp.result);
- }
+ ((Tuple)res.result).append(temp.result);
+ }
}
res.returnStatus = temp.returnStatus;
@@ -273,8 +273,7 @@ public class POUserFunc extends Expressi
boolean timeThis = doTiming && (numInvocations++ % TIMING_FREQ == 0);
if (timeThis) {
startNanos = System.nanoTime();
- PigStatusReporter.getInstance().getCounter(counterGroup, INVOCATION_COUNTER).increment(TIMING_FREQ);
-
+ PigStatusReporter.getInstance().incrCounter(counterGroup, INVOCATION_COUNTER, TIMING_FREQ);
}
try {
if(result.returnStatus == POStatus.STATUS_OK) {
@@ -355,8 +354,8 @@ public class POUserFunc extends Expressi
}
}
if (timeThis) {
- PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
- ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+ Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
}
return result;
} catch (ExecException ee) {
@@ -404,13 +403,11 @@ public class POUserFunc extends Expressi
@Override
public Result getNextBoolean() throws ExecException {
-
return getNext();
}
@Override
public Result getNextDataByteArray() throws ExecException {
-
return getNext();
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java Mon Apr 28 22:02:45 2014
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledThr
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.Tuple;
@@ -136,20 +135,18 @@ public class MonitoredUDFExecutor implem
@SuppressWarnings("unchecked")
public static void handleError(EvalFunc evalFunc, Exception e) {
evalFunc.getLogger().error(e);
- StatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null &&
- reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null) {
- reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(evalFunc.getClass().getName(), e.toString(), 1);
}
}
@SuppressWarnings("unchecked")
public static void handleTimeout(EvalFunc evalFunc, Exception e) {
evalFunc.getLogger().error(e);
- StatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null &&
- reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout") != null) {
- reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout", 1);
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Apr 28 22:02:45 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -39,11 +40,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.Event;
@@ -67,7 +70,7 @@ public class PigProcessor implements Log
private PhysicalOperator leaf;
private Configuration conf;
- private PigTezLogger pigTezLogger;
+ private PigHadoopLogger pigHadoopLogger;
public static String sampleVertex;
public static Map<String, Object> sampleMap;
@@ -102,9 +105,12 @@ public class PigProcessor implements Log
PigMapReduce.sJobConfInternal.set(conf);
boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
-
- pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
- PhysicalOperator.setPigLogger(pigTezLogger);
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ pigStatusReporter.setContext(new TaskContext<TezProcessorContext>(processorContext));
+ pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
for (TezTaskConfigurable tezTC : tezTCs){
@@ -132,9 +138,9 @@ public class PigProcessor implements Log
conf = null;
sampleMap = null;
sampleVertex = null;
- if (pigTezLogger != null) {
- pigTezLogger.destroy();
- pigTezLogger = null;
+ if (pigHadoopLogger != null) {
+ pigHadoopLogger.destory();
+ pigHadoopLogger = null;
}
}
Modified: pig/branches/tez/src/org/apache/pig/tools/counters/PigCounterHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/counters/PigCounterHelper.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/counters/PigCounterHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/counters/PigCounterHelper.java Mon Apr 28 22:02:45 2014
@@ -20,7 +20,6 @@ package org.apache.pig.tools.counters;
import java.util.Map;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -33,53 +32,51 @@ import com.google.common.collect.Maps;
* stored counters each time it does.
*/
public class PigCounterHelper {
- private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
- private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
+ private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
+ private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
- /**
- * Mocks the Reporter.incrCounter, but adds buffering.
- * See org.apache.hadoop.mapred.Reporter's incrCounter.
- */
- public void incrCounter(String group, String counterName, long incr) {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null) { // common case
- Counter counter = reporter.getCounter(group, counterName);
- if (counter != null) {
- counter.increment(incr);
-
- if (counterStringMap_.size() > 0) {
- for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
- reporter.getCounter(entry.getKey().first, entry.getKey().second).increment(entry.getValue());
- }
- counterStringMap_.clear();
+ /**
+ * Mocks the Reporter.incrCounter, but adds buffering.
+ * See org.apache.hadoop.mapred.Reporter's incrCounter.
+ */
+ public void incrCounter(String group, String counterName, long incr) {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null && reporter.incrCounter(group, counterName, incr)) { // common case
+ if (counterStringMap_.size() > 0) {
+ for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
+ reporter.incrCounter(entry.getKey().first, entry.getKey().second, entry.getValue());
+ }
+ counterStringMap_.clear();
+ }
+ return;
}
- return;
- }
+
+ // In the case when reporter is not available, or we can't get the
+ // Counter, store in the local map.
+ Pair<String, String> key = new Pair<String, String>(group, counterName);
+ Long currentValue = counterStringMap_.get(key);
+ counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
}
- // In the case when reporter is not available, or we can't get the Counter,
- // store in the local map.
- Pair<String, String> key = new Pair<String, String>(group, counterName);
- Long currentValue = counterStringMap_.get(key);
- counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
- }
-
- /**
- * Mocks the Reporter.incrCounter, but adds buffering.
- * See org.apache.hadoop.mapred.Reporter's incrCounter.
- */
- public void incrCounter(Enum<?> key, long incr) {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null && reporter.getCounter(key) != null) {
- reporter.getCounter(key).increment(incr);
- if (counterEnumMap_.size() > 0) {
- for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
- reporter.getCounter(entry.getKey()).increment(entry.getValue());
+
+ /**
+ * Mocks the Reporter.incrCounter, but adds buffering.
+ * See org.apache.hadoop.mapred.Reporter's incrCounter.
+ */
+ public void incrCounter(Enum<?> key, long incr) {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null && reporter.incrCounter(key, incr)) { // common case
+ if (counterEnumMap_.size() > 0) {
+ for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
+ reporter.getCounter(entry.getKey()).increment(entry.getValue());
+ }
+ counterEnumMap_.clear();
+ }
+ return;
}
- counterEnumMap_.clear();
- }
- } else { // buffer the increments
- Long currentValue = counterEnumMap_.get(key);
- counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
+
+ // In the case when reporter is not available, or we can't get the
+ // Counter, store in the local map.
+ Long currentValue = counterEnumMap_.get(key);
+ counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
}
- }
}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1590822&r1=1590821&r2=1590822&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Mon Apr 28 22:02:45 2014
@@ -20,80 +20,85 @@ package org.apache.pig.tools.pigstats;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.Progressable;
-import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-@SuppressWarnings("unchecked")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PigStatusReporter extends StatusReporter implements Progressable {
- private TaskInputOutputContext context;
- private FetchContext fetchContext;
-
private static PigStatusReporter reporter = null;
+ private TaskContext<?> context = null;
+
+ private PigStatusReporter() {
+ }
+
/**
* Get singleton instance of the context
*/
public static PigStatusReporter getInstance() {
if (reporter == null) {
- reporter = new PigStatusReporter(null);
+ reporter = new PigStatusReporter();
}
return reporter;
}
- public static void setContext(TaskInputOutputContext context) {
- reporter = new PigStatusReporter(context);
+ public void destory() {
+ context = null;
}
- private PigStatusReporter(TaskInputOutputContext context) {
+ public void setContext(TaskContext<?> context) {
this.context = context;
}
+ /**
+ * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+ * This method returns MR counter which is not compatible with Tez mode. Use
+ * incrCounter() that is compatible with both MR and Tez mode.
+ */
@Override
+ @Deprecated
public Counter getCounter(Enum<?> name) {
- if (fetchContext != null) {
- return fetchContext.getCounter(name);
- }
- return (context == null) ? null : context.getCounter(name);
+ return (context == null) ? null : context.getCounter(name);
}
+ /**
+ * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+ * This method returns MR counter which is not compatible with Tez mode. Use
+ * incrCounter() that is compatible with both MR and Tez mode.
+ */
@Override
+ @Deprecated
public Counter getCounter(String group, String name) {
- if (fetchContext != null) {
- return fetchContext.getCounter(group, name);
- }
- return (context == null) ? null : context.getCounter(group, name);
+ return context == null ? null : context.getCounter(group, name);
+ }
+
+ public boolean incrCounter(Enum<?> name, long incr) {
+ return context == null ? false : context.incrCounter(name, incr);
+ }
+
+ public boolean incrCounter(String group, String name, long incr) {
+ return context == null ? false : context.incrCounter(group, name, incr);
}
@Override
public void progress() {
- if (fetchContext == null && context != null) {
+ if (context != null) {
context.progress();
}
}
@Override
public void setStatus(String status) {
- if (fetchContext == null && context != null) {
+ if (context != null) {
context.setStatus(status);
}
}
public float getProgress() {
- return 0;
+ return context == null ? 0f : context.getProgress();
}
-
- /**
- * Sets a dummy counter handler for fetch tasks
- * @param fetchContext
- */
- public void setFetchContext(FetchContext fetchContext) {
- this.fetchContext = fetchContext;
- }
-
}