You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/29 23:18:29 UTC
svn commit: r1628317 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/
src/org/apache/pig/backend/...
Author: rohini
Date: Mon Sep 29 21:18:28 2014
New Revision: 1628317
URL: http://svn.apache.org/r1628317
Log:
PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 29 21:18:28 2014
@@ -92,6 +92,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)
+
PIG-4205: e2e test property-check does not check all prerequisites (kellyzly via daijy)
PIG-4180: e2e test Native_3 fail on Hadoop 2 (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Sep 29 21:18:28 2014
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
@@ -37,7 +36,6 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -50,19 +48,19 @@ import org.apache.pig.impl.util.ObjectSe
*/
@SuppressWarnings("unchecked")
public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
-
+
private enum Mode { SINGLE_STORE, MULTI_STORE};
-
+
/** the temporary directory for the multi store */
public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
/** the relative path that can be used to build a temporary
* place to store the output from a number of map-reduce tasks*/
public static final String PIG_TMP_PATH = "pig.tmp.path";
-
- List<POStore> reduceStores = null;
- List<POStore> mapStores = null;
- Configuration currentConf = null;
-
+
+ protected List<POStore> reduceStores = null;
+ protected List<POStore> mapStores = null;
+ protected Configuration currentConf = null;
+
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
@@ -97,27 +95,27 @@ public class PigOutputFormat extends Out
@SuppressWarnings("unchecked")
static public class PigRecordWriter
extends RecordWriter<WritableComparable, Tuple> {
-
+
/**
* the actual RecordWriter
*/
private RecordWriter wrappedWriter;
-
+
/**
* the StoreFunc for the single store
*/
private StoreFuncInterface sFunc;
-
+
/**
* Single Query or multi query
*/
private Mode mode;
-
- public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
+
+ public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
Mode mode)
- throws IOException {
+ throws IOException {
this.mode = mode;
-
+
if(mode == Mode.SINGLE_STORE) {
this.wrappedWriter = wrappedWriter;
this.sFunc = sFunc;
@@ -128,7 +126,7 @@ public class PigOutputFormat extends Out
/**
* We only care about the values, so we are going to skip the keys when
* we write.
- *
+ *
* @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
*/
@Override
@@ -142,7 +140,7 @@ public class PigOutputFormat extends Out
}
@Override
- public void close(TaskAttemptContext taskattemptcontext) throws
+ public void close(TaskAttemptContext taskattemptcontext) throws
IOException, InterruptedException {
if(mode == Mode.SINGLE_STORE) {
wrappedWriter.close(taskattemptcontext);
@@ -150,24 +148,24 @@ public class PigOutputFormat extends Out
}
}
-
+
/**
* Before delegating calls to underlying OutputFormat or OutputCommitter
* Pig needs to ensure the Configuration in the JobContext contains
- * the output location and StoreFunc
- * for the specific store - so set these up in the context for this specific
+ * the output location and StoreFunc
+ * for the specific store - so set these up in the context for this specific
* store
* @param jobContext the {@link JobContext}
* @param store the POStore
* @throws IOException on failure
*/
- public static void setLocation(JobContext jobContext, POStore store) throws
+ public static void setLocation(JobContext jobContext, POStore store) throws
IOException {
Job storeJob = new Job(jobContext.getConfiguration());
StoreFuncInterface storeFunc = store.getStoreFunc();
String outputLocation = store.getSFile().getFileName();
storeFunc.setStoreLocation(outputLocation, storeJob);
-
+
// the setStoreLocation() method would indicate to the StoreFunc
// to set the output location for its underlying OutputFormat.
// Typically OutputFormat's store the output location in the
@@ -176,7 +174,7 @@ public class PigOutputFormat extends Out
// OutputFormat might have set) and merge it with the Configuration
// we started with so that when this method returns the Configuration
// supplied as input has the updates.
- ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
+ ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
storeJob.getConfiguration());
}
@@ -187,20 +185,20 @@ public class PigOutputFormat extends Out
checkOutputSpecsHelper(reduceStores, jobcontext);
}
- private void checkOutputSpecsHelper(List<POStore> stores, JobContext
+ private void checkOutputSpecsHelper(List<POStore> stores, JobContext
jobcontext) throws IOException, InterruptedException {
for (POStore store : stores) {
// make a copy of the original JobContext so that
- // each OutputFormat get a different copy
+ // each OutputFormat get a different copy
JobContext jobContextCopy = HadoopShims.createJobContext(
jobcontext.getConfiguration(), jobcontext.getJobID());
-
+
// set output location
PigOutputFormat.setLocation(jobContextCopy, store);
-
+
StoreFuncInterface sFunc = store.getStoreFunc();
OutputFormat of = sFunc.getOutputFormat();
-
+
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
try {
@@ -224,23 +222,22 @@ public class PigOutputFormat extends Out
* @param currentConf2
* @param storeLookupKey
* @return
- * @throws IOException
+ * @throws IOException
*/
- private List<POStore> getStores(Configuration conf, String storeLookupKey)
+ private List<POStore> getStores(Configuration conf, String storeLookupKey)
throws IOException {
return (List<POStore>)ObjectSerializer.deserialize(
conf.get(storeLookupKey));
}
-
-
- private void setupUdfEnvAndStores(JobContext jobcontext)
+
+ protected void setupUdfEnvAndStores(JobContext jobcontext)
throws IOException{
Configuration newConf = jobcontext.getConfiguration();
-
- // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
+
+ // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
// construct of PigOutputCommitter, can make use of it
MapRedUtil.setupUDFContext(newConf);
-
+
// if there is a udf in the plan we would need to know the import
// path so we can instantiate the udf. This is required because
// we will be deserializing the POStores out of the plan in the next
@@ -261,13 +258,13 @@ public class PigOutputFormat extends Out
// config properties have changed (eg. creating stores).
currentConf = new Configuration(newConf);
}
-
+
/**
* Check if given property prop is same in configurations conf1, conf2
* @param prop
* @param conf1
* @param conf2
- * @return true if both are equal
+ * @return true if both are equal
*/
private boolean isConfPropEqual(String prop, Configuration conf1,
Configuration conf2) {
@@ -283,10 +280,10 @@ public class PigOutputFormat extends Out
}
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext
+ public OutputCommitter getOutputCommitter(TaskAttemptContext
taskattemptcontext) throws IOException, InterruptedException {
setupUdfEnvAndStores(taskattemptcontext);
-
+
// we return an instance of PigOutputCommitter to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitter(taskattemptcontext,
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Sep 29 21:18:28 2014
@@ -19,6 +19,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
@@ -101,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
@@ -550,7 +553,7 @@ public class TezDagBuilder extends TezOp
}
}
}
- JobControlCompiler.setOutputFormat(job);
+ setOutputFormat(job);
// set parent plan in all operators. currently the parent plan is really
// used only when POStream, POSplit are present in the plan
@@ -1011,4 +1014,25 @@ public class TezDagBuilder extends TezOp
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS,
comparatorClass);
}
+
+ private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
+ // the OutputFormat we report to Hadoop is always PigOutputFormat which
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ try {
+ Class<?> clazz = PigContext
+ .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass",
+ org.apache.hadoop.mapreduce.Job.class, Class.class);
+ method.invoke(null, job, PigOutputFormatTez.class);
+ } catch (Exception e) {
+ job.setOutputFormatClass(PigOutputFormatTez.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
+ } else {
+ job.setOutputFormatClass(PigOutputFormatTez.class);
+ }
+ }
}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1628317&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java Mon Sep 29 21:18:28 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.util.UDFContext;
+
+public class PigOutputFormatTez extends PigOutputFormat {
+
+ @Override
+ public OutputCommitter getOutputCommitter(
+ TaskAttemptContext taskattemptcontext) throws IOException,
+ InterruptedException {
+ setupUdfEnvAndStores(taskattemptcontext);
+
+ // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance
+ // will wrap the real OutputCommitter(s) belonging to the store(s)
+ return new PigOutputCommitterTez(taskattemptcontext,
+ mapStores,
+ reduceStores);
+ }
+
+ public static class PigOutputCommitterTez extends PigOutputCommitter {
+
+ public PigOutputCommitterTez(TaskAttemptContext context,
+ List<POStore> mapStores, List<POStore> reduceStores)
+ throws IOException {
+ super(context, mapStores, reduceStores);
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ cleanupForContainerReuse();
+ try {
+ super.setupJob(context);
+ } finally {
+ cleanupForContainerReuse();
+ }
+
+ }
+
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ cleanupForContainerReuse();
+ try {
+ super.commitJob(context);
+ } finally {
+ cleanupForContainerReuse();
+ }
+ }
+
+ @Override
+ public void abortJob(JobContext context, State state)
+ throws IOException {
+ cleanupForContainerReuse();
+ try {
+ super.abortJob(context, state);
+ } finally {
+ cleanupForContainerReuse();
+ }
+ }
+
+ private void cleanupForContainerReuse() {
+ UDFContext.getUDFContext().reset();
+ }
+
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Sep 29 21:18:28 2014
@@ -172,7 +172,7 @@ public class MapRedUtil {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
// don't deserialize in front-end
- if (!udfc.isFrontend()) {
+ if (udfc.isUDFConfEmpty()) {
udfc.deserialize();
}
}