You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/29 03:31:00 UTC

svn commit: r1028582 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ test/org/apache/pig/test/

Author: thejas
Date: Fri Oct 29 01:30:59 2010
New Revision: 1028582

URL: http://svn.apache.org/viewvc?rev=1028582&view=rev
Log:
PIG-1684: Inconsistent usage of store func.

Added:
    pig/trunk/test/org/apache/pig/test/TestStoreInstances.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/StoreFunc.java
    pig/trunk/src/org/apache/pig/StoreFuncInterface.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 29 01:30:59 2010
@@ -215,6 +215,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1684: Inconsistent usage of store func. (thejas)
+
 PIG-1694: union-onschema projects null schema at parsing stage for some queries (thejas)
 
 PIG-1685: Pig is unable to handle counters for glob paths ? (daijy)

Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Fri Oct 29 01:30:59 2010
@@ -63,7 +63,7 @@ public abstract class StoreFunc implemen
 
     /**
      * Return the OutputFormat associated with StoreFunc.  This will be called
-     * on the front end during planning and not on the backend during
+     * on the front end during planning and on the backend during
      * execution. 
      * @return the {@link OutputFormat} associated with StoreFunc
      * @throws IOException if an exception occurs while constructing the 

Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Fri Oct 29 01:30:59 2010
@@ -60,7 +60,7 @@ public interface StoreFuncInterface {
 
     /**
      * Return the OutputFormat associated with StoreFuncInterface.  This will be called
-     * on the front end during planning and not on the backend during
+     * on the front end during planning and on the backend during
      * execution. 
      * @return the {@link OutputFormat} associated with StoreFuncInterface
      * @throws IOException if an exception occurs while constructing the 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Oct 29 01:30:59 2010
@@ -56,40 +56,29 @@ public class PigOutputCommitter extends 
     
     /**
      * @param context
+     * @param list2 
+     * @param list 
      * @throws IOException
      */
-    public PigOutputCommitter(TaskAttemptContext context)
+    public PigOutputCommitter(TaskAttemptContext context,
+            List<POStore> mapStores, List<POStore> reduceStores)
             throws IOException {
         // create and store the map and reduce output committers
-        mapOutputCommitters = getCommitters(context, 
-                JobControlCompiler.PIG_MAP_STORES);
-        reduceOutputCommitters = getCommitters(context, 
-                JobControlCompiler.PIG_REDUCE_STORES);
+        mapOutputCommitters = getCommitters(context, mapStores);
+        reduceOutputCommitters = getCommitters(context, reduceStores);
         
     }
 
     /**
      * @param conf
-     * @param storeLookupKey
+     * @param mapStores
      * @return
      * @throws IOException 
      */
     @SuppressWarnings("unchecked")
     private List<Pair<OutputCommitter, POStore>> getCommitters(
             TaskAttemptContext context,
-            String storeLookupKey) throws IOException {
-        Configuration conf = context.getConfiguration();
-        
-        // if there is a udf in the plan we would need to know the import
-        // path so we can instantiate the udf. This is required because
-        // we will be deserializing the POStores out of the plan in the next
-        // line below. The POStore inturn has a member reference to the Physical
-        // plan it is part of - so the deserialization goes deep and while
-        // deserializing the plan, the udf.import.list may be needed.
-        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
-                deserialize(conf.get("udf.import.list")));
-        LinkedList<POStore> stores = (LinkedList<POStore>) ObjectSerializer.
-        deserialize(conf.get(storeLookupKey));
+            List<POStore> stores) throws IOException {
         List<Pair<OutputCommitter, POStore>> committers = 
             new ArrayList<Pair<OutputCommitter,POStore>>();
         for (POStore store : stores) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Oct 29 01:30:59 2010
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
@@ -56,17 +58,15 @@ public class PigOutputFormat extends Out
     /** the relative path that can be used to build a temporary
      * place to store the output from a number of map-reduce tasks*/
     public static final String PIG_TMP_PATH =  "pig.tmp.path";
-     
+    
+    List<POStore> reduceStores = null;
+    List<POStore> mapStores = null;
+    Configuration currentConf = null;
+    
     @Override
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
                 throws IOException, InterruptedException {
-        // Setup UDFContext so StoreFunc can make use of it
-        MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
-        List<POStore> mapStores = getStores(taskattemptcontext, 
-                JobControlCompiler.PIG_MAP_STORES);
-        List<POStore> reduceStores = getStores(taskattemptcontext, 
-                JobControlCompiler.PIG_REDUCE_STORES);
-        
+        setupUdfEnvAndStores(taskattemptcontext);
         if(mapStores.size() + reduceStores.size() == 1) {
             // single store case
             POStore store;
@@ -182,13 +182,8 @@ public class PigOutputFormat extends Out
 
     @Override
     public void checkOutputSpecs(JobContext jobcontext) throws IOException, InterruptedException {
-        // Setup UDFContext so in StoreFunc can make use of it
-        MapRedUtil.setupUDFContext(jobcontext.getConfiguration());
-        List<POStore> mapStores = getStores(jobcontext, 
-                JobControlCompiler.PIG_MAP_STORES);
+        setupUdfEnvAndStores(jobcontext);
         checkOutputSpecsHelper(mapStores, jobcontext);
-        List<POStore> reduceStores = getStores(jobcontext, 
-                JobControlCompiler.PIG_REDUCE_STORES);
         checkOutputSpecsHelper(reduceStores, jobcontext);
     }
 
@@ -212,26 +207,76 @@ public class PigOutputFormat extends Out
         }
     }
     /**
-     * @param jobcontext
+     * @param currentConf2
      * @param storeLookupKey
      * @return
      * @throws IOException 
      */
-    private List<POStore> getStores(JobContext jobcontext, String storeLookupKey) 
+    private List<POStore> getStores(Configuration conf, String storeLookupKey) 
     throws IOException {
-        Configuration conf = jobcontext.getConfiguration();
         return (List<POStore>)ObjectSerializer.deserialize(
                 conf.get(storeLookupKey));
     }
     
+    
+    private void setupUdfEnvAndStores(JobContext jobcontext)
+    throws IOException{
+        Configuration newConf = jobcontext.getConfiguration();
+        
+        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside 
+        // construct of PigOutputCommitter, can make use of it
+        MapRedUtil.setupUDFContext(newConf);
+        
+        // if there is a udf in the plan we would need to know the import
+        // path so we can instantiate the udf. This is required because
+        // we will be deserializing the POStores out of the plan in the next
+        // line below. The POStore inturn has a member reference to the Physical
+        // plan it is part of - so the deserialization goes deep and while
+        // deserializing the plan, the udf.import.list may be needed.
+        if(! isConfPropEqual("udf.import.list", currentConf, newConf)){
+            PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
+                    deserialize(newConf.get("udf.import.list")));
+        }
+        if(! isConfPropEqual(JobControlCompiler.PIG_MAP_STORES, currentConf, newConf)){
+            mapStores = getStores(newConf, JobControlCompiler.PIG_MAP_STORES);
+        }
+        if(! isConfPropEqual(JobControlCompiler.PIG_REDUCE_STORES, currentConf, newConf)){
+            reduceStores = getStores(newConf, JobControlCompiler.PIG_REDUCE_STORES);
+        }
+        //keep a copy of the config, so some steps don't need to be taken unless
+        // config properties have changed (eg. creating stores).
+        currentConf = new Configuration(newConf);
+    }
+    
+    /**
+     * Check if given property prop is same in configurations conf1, conf2
+     * @param prop
+     * @param conf1
+     * @param conf2
+     * @return true if both are equal 
+     */
+    private boolean isConfPropEqual(String prop, Configuration conf1,
+            Configuration conf2) {
+        if( (conf1 == null || conf2 == null) && (conf1 != conf2) ){
+            return false;
+        }
+        String str1 = conf1.get(prop);
+        String str2 = conf2.get(prop);
+        if( (str1 == null || str2 == null) && (str1 != str2) ){
+            return false;
+        }
+        return str1.equals(str2);
+    }
+
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext 
             taskattemptcontext) throws IOException, InterruptedException {
-        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside 
-        // construct of PigOutputCommitter, can make use of it
-        MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
+        setupUdfEnvAndStores(taskattemptcontext);
+        
         // we return an instance of PigOutputCommitter to Hadoop - this instance
         // will wrap the real OutputCommitter(s) belonging to the store(s)
-        return new PigOutputCommitter(taskattemptcontext);
+        return new PigOutputCommitter(taskattemptcontext,
+                mapStores,
+                reduceStores);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri Oct 29 01:30:59 2010
@@ -212,10 +212,13 @@ public class POStore extends PhysicalOpe
         return schema;
     }
     
+    
     public StoreFuncInterface getStoreFunc() {
-        StoreFuncInterface sFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
-        sFunc.setStoreFuncUDFContextSignature(signature);
-        return sFunc;
+        if(storer == null){
+            storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+            storer.setStoreFuncUDFContextSignature(signature);
+        }
+        return storer;
     }
     
     /**

Added: pig/trunk/test/org/apache/pig/test/TestStoreInstances.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreInstances.java?rev=1028582&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreInstances.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreInstances.java Fri Oct 29 01:30:59 2010
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Test to ensure that same instance of store func is used for multiple
+ *  backend tasks. This enables sharing of information between putNext and
+ *  output committer
+ * 
+ */
+public class TestStoreInstances  {
+    static MiniCluster cluster ;
+    private static final String INP_FILE_2NUMS = "TestStoreInstances";
+
+    @Before
+    public void setUp() throws Exception {
+        FileLocalizer.setInitialized(false);
+    }
+
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @BeforeClass
+    public static void oneTimeSetup() throws IOException, Exception {
+        cluster = MiniCluster.buildCluster();
+
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        Util.createInputFile(cluster, INP_FILE_2NUMS, input);
+        Util.createLocalInputFile(INP_FILE_2NUMS, input);
+    }
+
+    private static final String CHECK_INSTANCE_STORE_FUNC
+    = "org.apache.pig.test.TestStoreInstances\\$STFuncCheckInstances";
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        new File(INP_FILE_2NUMS).delete();
+        cluster.shutDown();
+    }
+
+    /**
+     * Test that putnext is able to communicate to outputcommitter
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testBackendStoreCommunication() throws IOException, ParseException {
+        ExecType[] execTypes = { ExecType.MAPREDUCE, ExecType.LOCAL};
+        PigServer pig = null;
+        for(ExecType execType : execTypes){
+            System.err.println("Starting test mode " + execType);
+            if(execType == ExecType.MAPREDUCE) {
+                pig = new PigServer(ExecType.MAPREDUCE, 
+                        cluster.getProperties());
+            }else{
+                pig = new PigServer(ExecType.LOCAL);
+            }
+            final String outFile = "TestStoreInst1";
+            Util.deleteFile(pig.getPigContext(), outFile);
+            pig.setBatchOn();
+            String query =
+                "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" +
+                " store l1 into '" + outFile + "' using " + CHECK_INSTANCE_STORE_FUNC + 
+                ";";
+            Util.registerMultiLineQuery(pig, query);
+            List<ExecJob> execJobs = pig.executeBatch();
+            assertEquals("num jobs", 1, execJobs.size());
+            assertEquals("status ", JOB_STATUS.COMPLETED, execJobs.get(0).getStatus());
+        }
+            
+    }
+
+
+    /**
+     * Store func that records output rows in a variable
+     */
+    public static class STFuncCheckInstances extends StoreFunc {
+
+        private ArrayList<Tuple> outRows;
+
+        public STFuncCheckInstances(){
+            super();
+            this.outRows = new ArrayList<Tuple>();
+        }
+
+        @Override
+        public OutputFormat getOutputFormat() throws IOException {
+            return new OutFormatCheckInstances(outRows);
+        }
+
+        @Override
+        public void prepareToWrite(RecordWriter writer) throws IOException {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public void putNext(Tuple t) throws IOException {
+            outRows.add(t);
+
+        }
+
+
+        @Override
+        public void setStoreLocation(String location, Job job)
+                throws IOException {
+            Configuration conf = job.getConfiguration();
+            conf.set("mapred.output.dir", location);
+            
+        }
+
+
+    }
+
+    /**
+     * OutputFormat class for the store func
+     */
+    public static class OutFormatCheckInstances extends TextOutputFormat {
+
+        private ArrayList<Tuple> outRows;
+
+        public OutFormatCheckInstances(ArrayList<Tuple> outRows) {
+            super();
+            this.outRows = outRows;
+        }
+
+        @Override
+        public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
+        throws IOException {
+            return new OutputCommitterTestInstances(outRows, arg0);
+        }
+
+    }
+
+    /**
+     * OutputCommitter class that checks number of rows written by store func
+     */
+    public static class OutputCommitterTestInstances extends FileOutputCommitter {
+
+
+        private ArrayList<Tuple> outRows;
+
+        public OutputCommitterTestInstances(ArrayList<Tuple> outRows,
+                TaskAttemptContext taskAttemptCtx) throws IOException {
+            super(new Path(taskAttemptCtx.getConfiguration().get("mapred.output.dir")), taskAttemptCtx);
+            this.outRows = outRows;
+        }
+
+   
+        @Override
+        public void commitTask(TaskAttemptContext arg0) {
+            System.err.println("OutputCommitterTestInstances commitTask called");
+            assertTrue("Number of output rows > 0 ",outRows.size() > 0);
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext arg0)
+        throws IOException {
+            return true;
+        }
+
+
+    }
+
+
+}