You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/29 23:18:29 UTC

svn commit: r1628317 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ src/org/apache/pig/backend/...

Author: rohini
Date: Mon Sep 29 21:18:28 2014
New Revision: 1628317

URL: http://svn.apache.org/r1628317
Log:
PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 29 21:18:28 2014
@@ -92,6 +92,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)
+
 PIG-4205: e2e test property-check does not check all prerequisites (kellyzly via daijy)
 
 PIG-4180: e2e test Native_3 fail on Hadoop 2 (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Sep 29 21:18:28 2014
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
@@ -37,7 +36,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -50,19 +48,19 @@ import org.apache.pig.impl.util.ObjectSe
  */
 @SuppressWarnings("unchecked")
 public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
-    
+
     private enum Mode { SINGLE_STORE, MULTI_STORE};
-    
+
     /** the temporary directory for the multi store */
     public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
     /** the relative path that can be used to build a temporary
      * place to store the output from a number of map-reduce tasks*/
     public static final String PIG_TMP_PATH =  "pig.tmp.path";
-    
-    List<POStore> reduceStores = null;
-    List<POStore> mapStores = null;
-    Configuration currentConf = null;
-    
+
+    protected List<POStore> reduceStores = null;
+    protected List<POStore> mapStores = null;
+    protected Configuration currentConf = null;
+
     @Override
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
                 throws IOException, InterruptedException {
@@ -97,27 +95,27 @@ public class PigOutputFormat extends Out
     @SuppressWarnings("unchecked")
     static public class PigRecordWriter
             extends RecordWriter<WritableComparable, Tuple> {
-        
+
         /**
          * the actual RecordWriter
          */
         private RecordWriter wrappedWriter;
-        
+
         /**
          * the StoreFunc for the single store
          */
         private StoreFuncInterface sFunc;
-        
+
         /**
          * Single Query or multi query
          */
         private Mode mode;
-        
-        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc, 
+
+        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
                 Mode mode)
-                throws IOException {            
+                throws IOException {
             this.mode = mode;
-            
+
             if(mode == Mode.SINGLE_STORE) {
                 this.wrappedWriter = wrappedWriter;
                 this.sFunc = sFunc;
@@ -128,7 +126,7 @@ public class PigOutputFormat extends Out
         /**
          * We only care about the values, so we are going to skip the keys when
          * we write.
-         * 
+         *
          * @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
          */
         @Override
@@ -142,7 +140,7 @@ public class PigOutputFormat extends Out
         }
 
         @Override
-        public void close(TaskAttemptContext taskattemptcontext) throws 
+        public void close(TaskAttemptContext taskattemptcontext) throws
         IOException, InterruptedException {
             if(mode == Mode.SINGLE_STORE) {
                 wrappedWriter.close(taskattemptcontext);
@@ -150,24 +148,24 @@ public class PigOutputFormat extends Out
         }
 
     }
-    
+
     /**
      * Before delegating calls to underlying OutputFormat or OutputCommitter
      * Pig needs to ensure the Configuration in the JobContext contains
-     * the output location and StoreFunc 
-     * for the specific store - so set these up in the context for this specific 
+     * the output location and StoreFunc
+     * for the specific store - so set these up in the context for this specific
      * store
      * @param jobContext the {@link JobContext}
      * @param store the POStore
      * @throws IOException on failure
      */
-    public static void setLocation(JobContext jobContext, POStore store) throws 
+    public static void setLocation(JobContext jobContext, POStore store) throws
     IOException {
         Job storeJob = new Job(jobContext.getConfiguration());
         StoreFuncInterface storeFunc = store.getStoreFunc();
         String outputLocation = store.getSFile().getFileName();
         storeFunc.setStoreLocation(outputLocation, storeJob);
-        
+
         // the setStoreLocation() method would indicate to the StoreFunc
         // to set the output location for its underlying OutputFormat.
         // Typically OutputFormat's store the output location in the
@@ -176,7 +174,7 @@ public class PigOutputFormat extends Out
         // OutputFormat might have set) and merge it with the Configuration
         // we started with so that when this method returns the Configuration
         // supplied as input has the updates.
-        ConfigurationUtil.mergeConf(jobContext.getConfiguration(), 
+        ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
                 storeJob.getConfiguration());
     }
 
@@ -187,20 +185,20 @@ public class PigOutputFormat extends Out
         checkOutputSpecsHelper(reduceStores, jobcontext);
     }
 
-    private void checkOutputSpecsHelper(List<POStore> stores, JobContext 
+    private void checkOutputSpecsHelper(List<POStore> stores, JobContext
             jobcontext) throws IOException, InterruptedException {
         for (POStore store : stores) {
             // make a copy of the original JobContext so that
-            // each OutputFormat get a different copy 
+            // each OutputFormat get a different copy
             JobContext jobContextCopy = HadoopShims.createJobContext(
                     jobcontext.getConfiguration(), jobcontext.getJobID());
-            
+
             // set output location
             PigOutputFormat.setLocation(jobContextCopy, store);
-            
+
             StoreFuncInterface sFunc = store.getStoreFunc();
             OutputFormat of = sFunc.getOutputFormat();
-            
+
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
             try {
@@ -224,23 +222,22 @@ public class PigOutputFormat extends Out
      * @param currentConf2
      * @param storeLookupKey
      * @return
-     * @throws IOException 
+     * @throws IOException
      */
-    private List<POStore> getStores(Configuration conf, String storeLookupKey) 
+    private List<POStore> getStores(Configuration conf, String storeLookupKey)
     throws IOException {
         return (List<POStore>)ObjectSerializer.deserialize(
                 conf.get(storeLookupKey));
     }
-    
-    
-    private void setupUdfEnvAndStores(JobContext jobcontext)
+
+    protected void setupUdfEnvAndStores(JobContext jobcontext)
     throws IOException{
         Configuration newConf = jobcontext.getConfiguration();
-        
-        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside 
+
+        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
         // construct of PigOutputCommitter, can make use of it
         MapRedUtil.setupUDFContext(newConf);
-        
+
         // if there is a udf in the plan we would need to know the import
         // path so we can instantiate the udf. This is required because
         // we will be deserializing the POStores out of the plan in the next
@@ -261,13 +258,13 @@ public class PigOutputFormat extends Out
         // config properties have changed (eg. creating stores).
         currentConf = new Configuration(newConf);
     }
-    
+
     /**
      * Check if given property prop is same in configurations conf1, conf2
      * @param prop
      * @param conf1
      * @param conf2
-     * @return true if both are equal 
+     * @return true if both are equal
      */
     private boolean isConfPropEqual(String prop, Configuration conf1,
             Configuration conf2) {
@@ -283,10 +280,10 @@ public class PigOutputFormat extends Out
     }
 
     @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext 
+    public OutputCommitter getOutputCommitter(TaskAttemptContext
             taskattemptcontext) throws IOException, InterruptedException {
         setupUdfEnvAndStores(taskattemptcontext);
-        
+
         // we return an instance of PigOutputCommitter to Hadoop - this instance
         // will wrap the real OutputCommitter(s) belonging to the store(s)
         return new PigOutputCommitter(taskattemptcontext,

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Sep 29 21:18:28 2014
@@ -19,6 +19,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -101,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
@@ -550,7 +553,7 @@ public class TezDagBuilder extends TezOp
                 }
             }
         }
-        JobControlCompiler.setOutputFormat(job);
+        setOutputFormat(job);
 
         // set parent plan in all operators. currently the parent plan is really
         // used only when POStream, POSplit are present in the plan
@@ -1011,4 +1014,25 @@ public class TezDagBuilder extends TezOp
         conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS,
                 comparatorClass);
     }
+
+    private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
+        // the OutputFormat we report to Hadoop is always PigOutputFormat which
+        // can be wrapped with LazyOutputFormat provided if it is supported by
+        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+            try {
+                Class<?> clazz = PigContext
+                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                Method method = clazz.getMethod("setOutputFormatClass",
+                        org.apache.hadoop.mapreduce.Job.class, Class.class);
+                method.invoke(null, job, PigOutputFormatTez.class);
+            } catch (Exception e) {
+                job.setOutputFormatClass(PigOutputFormatTez.class);
+                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+            }
+        } else {
+            job.setOutputFormatClass(PigOutputFormatTez.class);
+        }
+    }
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1628317&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java Mon Sep 29 21:18:28 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.util.UDFContext;
+
+public class PigOutputFormatTez extends PigOutputFormat {
+
+    @Override
+    public OutputCommitter getOutputCommitter(
+            TaskAttemptContext taskattemptcontext) throws IOException,
+            InterruptedException {
+        setupUdfEnvAndStores(taskattemptcontext);
+
+        // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance
+        // will wrap the real OutputCommitter(s) belonging to the store(s)
+        return new PigOutputCommitterTez(taskattemptcontext,
+                mapStores,
+                reduceStores);
+    }
+
+    public static class PigOutputCommitterTez extends PigOutputCommitter {
+
+        public PigOutputCommitterTez(TaskAttemptContext context,
+                List<POStore> mapStores, List<POStore> reduceStores)
+                throws IOException {
+            super(context, mapStores, reduceStores);
+        }
+
+        @Override
+        public void setupJob(JobContext context) throws IOException {
+            cleanupForContainerReuse();
+            try {
+                super.setupJob(context);
+            } finally {
+                cleanupForContainerReuse();
+            }
+
+        }
+
+        @Override
+        public void commitJob(JobContext context) throws IOException {
+            cleanupForContainerReuse();
+            try {
+                super.commitJob(context);
+            } finally {
+                cleanupForContainerReuse();
+            }
+        }
+
+        @Override
+        public void abortJob(JobContext context, State state)
+                throws IOException {
+            cleanupForContainerReuse();
+            try {
+                super.abortJob(context, state);
+            } finally {
+                cleanupForContainerReuse();
+            }
+        }
+
+        private void cleanupForContainerReuse() {
+            UDFContext.getUDFContext().reset();
+        }
+
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1628317&r1=1628316&r2=1628317&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Sep 29 21:18:28 2014
@@ -172,7 +172,7 @@ public class MapRedUtil {
         UDFContext udfc = UDFContext.getUDFContext();
         udfc.addJobConf(job);
         // don't deserialize in front-end
-        if (!udfc.isFrontend()) {
+        if (udfc.isUDFConfEmpty()) {
             udfc.deserialize();
         }
     }