You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/02 03:08:37 UTC
svn commit: r1591802 - in /pig/trunk: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/p...
Author: cheolsoo
Date: Fri May 2 01:08:37 2014
New Revision: 1591802
URL: http://svn.apache.org/r1591802
Log:
PIG-3914: Change TaskContext to abstract class (cheolsoo)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java
Removed:
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri May 2 01:08:37 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3914: Change TaskContext to abstract class (cheolsoo)
+
PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java Fri May 2 01:08:37 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+public abstract class TaskContext<T> {
+ public abstract T get();
+
+ public abstract Counter getCounter(Enum<?> name);
+
+ public abstract Counter getCounter(String group, String name);
+
+ public abstract boolean incrCounter(Enum<?> name, long delta);
+
+ public abstract boolean incrCounter(String group, String name, long delta);
+
+ public void progress() {
+ }
+
+ public float getProgress() {
+ return 0f;
+ }
+
+ public void setStatus(String status) {
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri May 2 01:08:37 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -36,7 +37,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
@@ -148,7 +148,7 @@ public class FetchLauncher {
boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
+ pigStatusReporter.setContext(new FetchTaskContext(new FetchContext()));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java Fri May 2 01:08:37 2014
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+
+public class FetchTaskContext extends TaskContext<FetchContext> {
+ private FetchContext context;
+
+ public FetchTaskContext(FetchContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public FetchContext get() {
+ return context;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ if (context == null) {
+ return null;
+ }
+ return context.getCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ if (context == null) {
+ return null;
+ }
+ return context.getCounter(group, name);
+ }
+
+ @Override
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ Counter counter = context.getCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+
+ @Override
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ Counter counter = context.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java Fri May 2 01:08:37 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+
+public class MRTaskContext extends TaskContext<TaskInputOutputContext<?,?,?,?>> {
+ private TaskInputOutputContext<?,?,?,?> context;
+
+ public MRTaskContext(TaskInputOutputContext<?,?,?,?> context) {
+ this.context = context;
+ }
+
+ @Override
+ public TaskInputOutputContext<?,?,?,?> get() {
+ return context;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ if (context == null) {
+ return null;
+ }
+ return context.getCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ if (context == null) {
+ return null;
+ }
+ return context.getCounter(group, name);
+ }
+
+ @Override
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ Counter counter = context.getCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+
+ @Override
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ Counter counter = context.getCounter(group, name);
+ counter.increment(delta);
+ return true;
+ }
+
+ public void progress() {
+ if (context == null) {
+ return;
+ }
+ context.progress();
+ }
+
+ public void setStatus(String status) {
+ if (context == null) {
+ return;
+ }
+ context.setStatus(status);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri May 2 01:08:37 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -29,8 +28,7 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* This class is used to have a POStore write to DFS via a output
@@ -51,7 +49,7 @@ public class MapReducePOStoreImpl extend
// not affect the caller's copy
Configuration outputConf = new Configuration(context.getConfiguration());
reporter = PigStatusReporter.getInstance();
- reporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ reporter.setContext(new MRTaskContext(context));
// make a copy of the Context to use here - since in the same
// task (map or reduce) we could have multiple stores, we should
@@ -113,9 +111,7 @@ public class MapReducePOStoreImpl extend
}
}
- public Counter createRecordCounter(POStore store) {
- String name = MRPigStatsUtil.getMultiStoreCounterName(store);
- return (name == null) ? null : reporter.getCounter(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
+ public void incrRecordCounter(String name, long incr) {
+ reporter.incrCounter(PigStatsUtil.MULTI_STORE_COUNTER_GROUP, name, incr);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri May 2 01:08:37 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -38,7 +37,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
@@ -142,7 +140,7 @@ public class PigCombiner {
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri May 2 01:08:37 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -42,7 +41,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.SchemaTupleBackend;
@@ -159,7 +157,7 @@ public abstract class PigGenericMapBase
*/
@SuppressWarnings("unchecked")
@Override
- public void setup(Context context) throws IOException, InterruptedException {
+ public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration job = context.getConfiguration();
@@ -212,7 +210,7 @@ public abstract class PigGenericMapBase
}
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ pigStatusReporter.setContext(new MRTaskContext(context));
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
@@ -244,7 +242,7 @@ public abstract class PigGenericMapBase
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri May 2 01:08:37 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -43,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
- pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+ pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri May 2 01:08:37 2014
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -36,17 +35,17 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* A wrapper around the actual RecordReader and loadfunc - this is needed for
* two reasons
* 1) To intercept the initialize call from hadoop and initialize the underlying
- * actual RecordReader with the right Context object - this is achieved by
- * looking up the Context corresponding to the input split this Reader is
+ * actual RecordReader with the right Context object - this is achieved by
+ * looking up the Context corresponding to the input split this Reader is
* supposed to process
- * 2) We need to give hadoop consistent key-value types - text and tuple
+ * 2) We need to give hadoop consistent key-value types - text and tuple
* respectively - so PigRecordReader will call underlying Loader's getNext() to
* get the Tuple value - the key is null text since key is not used in input to
* map() in Pig.
@@ -66,55 +65,54 @@ public class PigRecordReader extends Rec
* {@link LoadFunc#getNext()}
*/
Tuple curValue = null;
-
+
// the current wrapped RecordReader used by the loader
- @SuppressWarnings("unchecked")
- private RecordReader curReader;
-
+ private RecordReader<?, ?> curReader;
+
// the loader object
private LoadFunc loadfunc;
-
- // the Hadoop counter for multi-input jobs
- transient private Counter inputRecordCounter = null;
-
+
// the Hadoop counter name
transient private String counterName = null;
-
+
// the wrapped inputformat
- private InputFormat inputformat;
-
+ private InputFormat<?, ?> inputformat;
+
// the wrapped splits
private PigSplit pigSplit;
-
+
// the wrapped split index in use
private int idx;
-
+
private long progress;
-
+
private TaskAttemptContext context;
-
+
+ private PigStatusReporter reporter;
+
private final long limit;
private long recordCount = 0;
-
+
/**
* the Configuration object with data specific to the input the underlying
- * RecordReader will process (this is obtained after a
- * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
+ * RecordReader will process (this is obtained after a
+ * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
* call and hence can contain specific properties the underlying
* {@link InputFormat} might have put in.
*/
private Configuration inputSpecificConf;
/**
- * @param context
- *
+ * @param context
+ *
*/
- public PigRecordReader(InputFormat inputformat, PigSplit pigSplit,
+ public PigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit,
LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
this.inputformat = inputformat;
- this.pigSplit = pigSplit;
+ this.pigSplit = pigSplit;
this.loadfunc = loadFunc;
this.context = context;
+ this.reporter = PigStatusReporter.getInstance();
this.inputSpecificConf = context.getConfiguration();
curReader = null;
progress = 0;
@@ -124,7 +122,7 @@ public class PigRecordReader extends Rec
counterGroup = loadFunc.toString();
doTiming = context.getConfiguration().getBoolean(TIME_UDFS_PROP, false);
}
-
+
@Override
public void close() throws IOException {
if (curReader != null) {
@@ -141,23 +139,11 @@ public class PigRecordReader extends Rec
}
@Override
- public Tuple getCurrentValue() throws IOException, InterruptedException {
- if (inputRecordCounter == null && counterName != null) {
- PigStatusReporter reporter = PigStatusReporter.getInstance();
- if (reporter != null) {
- inputRecordCounter = reporter.getCounter(
- MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
- counterName);
- LOG.info("Created input record counter: " + counterName);
- } else {
- LOG.warn("Get null reporter for " + counterName);
- }
- }
+ public Tuple getCurrentValue() throws IOException, InterruptedException {
// Increment the multi-input record counter
- if (inputRecordCounter != null && curValue != null) {
- inputRecordCounter.increment(1);
+ if (counterName != null && curValue != null) {
+ reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 1);
}
-
return curValue;
}
@@ -174,8 +160,8 @@ public class PigRecordReader extends Rec
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
- // initialize the underlying actual RecordReader with the right Context
- // object - this is achieved by merging the Context corresponding to
+ // initialize the underlying actual RecordReader with the right Context
+ // object - this is achieved by merging the Context corresponding to
// the input split this Reader is supposed to process with the context
// passed in.
this.pigSplit = (PigSplit)split;
@@ -184,7 +170,7 @@ public class PigRecordReader extends Rec
inputSpecificConf);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
- PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(),
+ PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(),
context.getConfiguration());
// now invoke initialize() on underlying RecordReader with
// the "adjusted" conf
@@ -192,9 +178,13 @@ public class PigRecordReader extends Rec
curReader.initialize(pigSplit.getWrappedSplit(), context);
loadfunc.prepareToRead(curReader, pigSplit);
}
-
- if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) {
+ if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) {
counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
+ if (counterName != null) {
+ // Create the counter. This is needed because incrCounter() may
+ // never be called in case of empty file.
+ reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 0);
+ }
}
}
@@ -214,7 +204,7 @@ public class PigRecordReader extends Rec
}
}
if (timeThis) {
- PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+ reporter.incrCounter(counterGroup, TIMING_COUNTER,
Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
}
recordCount++;
@@ -224,13 +214,13 @@ public class PigRecordReader extends Rec
@SuppressWarnings("unchecked")
private static String getMultiInputsCounerName(PigSplit pigSplit,
Configuration conf) throws IOException {
- ArrayList<FileSpec> inputs =
+ ArrayList<FileSpec> inputs =
(ArrayList<FileSpec>) ObjectSerializer.deserialize(
conf.get(PigInputFormat.PIG_INPUTS));
String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
- return MRPigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
+ return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
}
-
+
/**
* Get the record reader for the next chunk in this CombineFileSplit.
*/
@@ -252,7 +242,7 @@ public class PigRecordReader extends Rec
// get a record reader for the idx-th chunk
try {
-
+
pigSplit.setCurrentIdx(idx);
curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(), context);
LOG.info("Current split being processed "+pigSplit.getWrappedSplit());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri May 2 01:08:37 2014
@@ -19,9 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
-import java.util.LinkedList;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.PigException;
import org.apache.pig.SortInfo;
import org.apache.pig.StoreFuncInterface;
@@ -37,9 +35,9 @@ import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
/**
* The store operator which is used in two ways:
@@ -53,37 +51,36 @@ public class POStore extends PhysicalOpe
private static final long serialVersionUID = 1L;
private static Result empty = new Result(POStatus.STATUS_NULL, null);
- transient private StoreFuncInterface storer;
+ transient private StoreFuncInterface storer;
transient private POStoreImpl impl;
+ transient private String counterName = null;
private FileSpec sFile;
private Schema schema;
-
- transient private Counter outputRecordCounter = null;
// flag to distinguish user stores from MRCompiler stores.
private boolean isTmpStore;
-
+
// flag to distinguish single store from multiquery store.
private boolean isMultiStore;
-
+
// flag to indicate if the custom counter should be disabled.
private boolean disableCounter = false;
-
+
// the index of multiquery store to track counters
private int index;
-
+
// If we know how to reload the store, here's how. The lFile
// FileSpec is set in PigServer.postProcess. It can be used to
// reload this store, if the optimizer has the need.
private FileSpec lFile;
-
+
// if the predecessor of store is Sort (order by)
- // then sortInfo will have information of the sort
+ // then sortInfo will have information of the sort
// column names and the asc/dsc info
private SortInfo sortInfo;
-
+
private String signature;
-
+
public POStore(OperatorKey k) {
this(k, -1, null);
}
@@ -91,11 +88,11 @@ public class POStore extends PhysicalOpe
public POStore(OperatorKey k, int rp) {
this(k, rp, null);
}
-
+
public POStore(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
}
-
+
/**
* Set up the storer
* @throws IOException
@@ -105,17 +102,22 @@ public class POStore extends PhysicalOpe
try{
storer = impl.createStoreFunc(this);
if (!isTmpStore && !disableCounter && impl instanceof MapReducePOStoreImpl) {
- outputRecordCounter =
- ((MapReducePOStoreImpl) impl).createRecordCounter(this);
+ counterName = PigStatsUtil.getMultiStoreCounterName(this);
+ if (counterName != null) {
+ // Create the counter. This is needed because
+ // incrCounter() may never be called in case of empty
+ // file.
+ ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 0);
+ }
}
}catch (IOException ioe) {
int errCode = 2081;
- String msg = "Unable to setup the store function.";
+ String msg = "Unable to setup the store function.";
throw new ExecException(msg, errCode, PigException.BUG, ioe);
}
}
}
-
+
/**
* Called at the end of processing for clean up.
* @throws IOException
@@ -125,7 +127,7 @@ public class POStore extends PhysicalOpe
impl.tearDown();
}
}
-
+
/**
* To perform cleanup when there is an error.
* @throws IOException
@@ -135,7 +137,7 @@ public class POStore extends PhysicalOpe
impl.cleanUp();
}
}
-
+
@Override
public Result getNextTuple() throws ExecException {
Result res = processInput();
@@ -148,8 +150,8 @@ public class POStore extends PhysicalOpe
illustratorMarkup(res.result, res.result, 0);
res = empty;
- if (outputRecordCounter != null) {
- outputRecordCounter.increment(1);
+ if (counterName != null) {
+ ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 1);
}
break;
case POStatus.STATUS_EOP:
@@ -206,11 +208,11 @@ public class POStore extends PhysicalOpe
public FileSpec getInputSpec() {
return lFile;
}
-
+
public void setIsTmpStore(boolean tmp) {
isTmpStore = tmp;
}
-
+
public boolean isTmpStore() {
return isTmpStore;
}
@@ -222,12 +224,12 @@ public class POStore extends PhysicalOpe
public void setSchema(Schema schema) {
this.schema = schema;
}
-
+
public Schema getSchema() {
return schema;
}
-
-
+
+
public StoreFuncInterface getStoreFunc() {
if(storer == null){
storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
@@ -235,7 +237,7 @@ public class POStore extends PhysicalOpe
}
return storer;
}
-
+
/**
* @param sortInfo the sortInfo to set
*/
@@ -249,11 +251,11 @@ public class POStore extends PhysicalOpe
public SortInfo getSortInfo() {
return sortInfo;
}
-
+
public String getSignature() {
return signature;
}
-
+
public void setSignature(String signature) {
this.signature = signature;
}
@@ -265,7 +267,7 @@ public class POStore extends PhysicalOpe
public boolean isMultiStore() {
return isMultiStore;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri May 2 01:08:37 2014
@@ -21,7 +21,7 @@ package org.apache.pig.tools.pigstats;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.util.Progressable;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;