You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/09/10 23:16:46 UTC

svn commit: r1383111 - in /pig/branches/branch-0.9: CHANGES.txt shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java test/org/apache/pig/test/TestMultiQueryLocal.java

Author: daijy
Date: Mon Sep 10 21:16:45 2012
New Revision: 1383111

URL: http://svn.apache.org/viewvc?rev=1383111&view=rev
Log:
PIG-2912: Pig should clone JobConf while creating JobContextImpl and TaskAttemptContextImpl in Hadoop23

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestMultiQueryLocal.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1383111&r1=1383110&r2=1383111&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Mon Sep 10 21:16:45 2012
@@ -30,6 +30,8 @@ PIG-2619: HBaseStorage constructs a Scan
 
 BUG FIXES
 
+PIG-2912: Pig should clone JobConf while creating JobContextImpl and TaskAttemptContextImpl in Hadoop23 (rohini via daijy)
+
 PIG-2484: Fix several e2e test failures/aborts for 23 (daijy)
 
 PIG-2721: Wrong output generated while loading bags as input (knoguchi via daijy)

Modified: pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1383111&r1=1383110&r2=1383111&view=diff
==============================================================================
--- pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Sep 10 21:16:45 2012
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.ContextFactory;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -36,20 +37,27 @@ import org.apache.pig.backend.hadoop.exe
 
 public class HadoopShims {
     static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
-        JobContext newContext = ContextFactory.cloneContext(original, original.getConfiguration());
+        JobContext newContext = ContextFactory.cloneContext(original,
+                new JobConf(original.getConfiguration()));
         return newContext;
     }
-    
-    static public TaskAttemptContext createTaskAttemptContext(Configuration conf, 
+
+    static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
                                 TaskAttemptID taskId) {
-        TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId);
-        return newContext;
+        if (conf instanceof JobConf) {
+            return new TaskAttemptContextImpl(new JobConf(conf), taskId);
+        } else {
+            return new TaskAttemptContextImpl(conf, taskId);
+        }
     }
-    
-    static public JobContext createJobContext(Configuration conf, 
+
+    static public JobContext createJobContext(Configuration conf,
             JobID jobId) {
-        JobContext newContext = new JobContextImpl(conf, jobId);
-        return newContext;
+        if (conf instanceof JobConf) {
+            return new JobContextImpl(new JobConf(conf), jobId);
+        } else {
+            return new JobContextImpl(conf, jobId);
+        }
     }
 
     static public boolean isMap(TaskAttemptID taskAttemptID) {

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1383111&r1=1383110&r2=1383111&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMultiQueryLocal.java Mon Sep 10 21:16:45 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.test;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.FileReader;
 import java.io.StringReader;
 import java.io.IOException;
 import java.io.File;
@@ -27,14 +30,20 @@ import java.util.Collections;
 import java.util.Properties;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -337,6 +346,89 @@ public class TestMultiQueryLocal {
     }
 
 
+    public static class PigStorageWithConfig extends PigStorage {
+
+        private static final String key = "test.key";
+        private String suffix;
+
+        public PigStorageWithConfig(String s) {
+            this.suffix = s;
+        }
+
+        @Override
+        public void setStoreLocation(String location, Job job) throws IOException {
+            super.setStoreLocation(location, job);
+            Assert.assertNull(job.getConfiguration().get(key));
+        }
+
+        @Override
+        public OutputFormat getOutputFormat() {
+            return new PigTextOutputFormatWithConfig();
+        }
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            try {
+                Tuple t = TupleFactory.getInstance().newTuple();
+                for (Object obj : f.getAll()) {
+                    t.append(obj);
+                }
+                t.append(suffix);
+                writer.write(null, t);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private static class PigTextOutputFormatWithConfig extends PigTextOutputFormat {
+
+        public PigTextOutputFormatWithConfig() {
+            super((byte) '\t');
+        }
+
+        @Override
+        public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
+                throws IOException {
+            context.getConfiguration().set(PigStorageWithConfig.key, "mapred.work.output.dir");
+            return super.getOutputCommitter(context);
+        }
+    }
+
+    // See PIG-2912
+    @Test
+    public void testMultiStoreWithConfig() {
+
+        System.out.println("===== test multi-query with competing config =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("store b into '/tmp/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('a');");
+            myPig.registerQuery("store c into '/tmp/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('b');");
+
+            myPig.executeBatch();
+            myPig.discardBatch();
+            BufferedReader reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal1/part-m-00000"));
+            String line;
+            while ((line = reader.readLine())!=null) {
+                Assert.assertTrue(line.endsWith("a"));
+            }
+            reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal2/part-m-00000"));
+            while ((line = reader.readLine())!=null) {
+                Assert.assertTrue(line.endsWith("b"));
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+
     @Test
     public void testMultiQueryWithExplain() {