You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/14 10:01:57 UTC
svn commit: r1326067 - in /pig/branches/branch-0.10: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/backend/hadoop/executionengine/ma...
Author: daijy
Date: Sat Apr 14 08:01:57 2012
New Revision: 1326067
URL: http://svn.apache.org/viewvc?rev=1326067&view=rev
Log:
PIG-2578: Multiple Store-commands mess up mapred.output.dir.
Modified:
pig/branches/branch-0.10/CHANGES.txt
pig/branches/branch-0.10/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/branch-0.10/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/branch-0.10/test/org/apache/pig/test/TestMultiQueryLocal.java
Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1326067&r1=1326066&r2=1326067&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Sat Apr 14 08:01:57 2012
@@ -190,6 +190,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2578: Multiple Store-commands mess up mapred.output.dir. (daijy)
+
PIG-2652: Skew join and order by don't trigger reducer estimation (billgraham via dvryaboy)
PIG-2623: Support S3 paths for registering UDFs (nshkrob via daijy)
Modified: pig/branches/branch-0.10/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1326067&r1=1326066&r2=1326067&view=diff
==============================================================================
--- pig/branches/branch-0.10/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.10/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sat Apr 14 08:01:57 2012
@@ -47,7 +47,7 @@ public class HadoopShims {
static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
TaskAttemptID taskId) {
- TaskAttemptContext newContext = new TaskAttemptContext(conf,
+ TaskAttemptContext newContext = new TaskAttemptContext(new Configuration(conf),
taskId);
return newContext;
}
@@ -55,7 +55,7 @@ public class HadoopShims {
static public JobContext createJobContext(Configuration conf,
JobID jobId) {
JobContext newJobContext = new JobContext(
- conf, jobId);
+ new Configuration(conf), jobId);
return newJobContext;
}
Modified: pig/branches/branch-0.10/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1326067&r1=1326066&r2=1326067&view=diff
==============================================================================
--- pig/branches/branch-0.10/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.10/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sat Apr 14 08:01:57 2012
@@ -42,13 +42,13 @@ public class HadoopShims {
static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
TaskAttemptID taskId) {
- TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId);
+ TaskAttemptContext newContext = new TaskAttemptContextImpl(new Configuration(conf), taskId);
return newContext;
}
static public JobContext createJobContext(Configuration conf,
JobID jobId) {
- JobContext newContext = new JobContextImpl(conf, jobId);
+ JobContext newContext = new JobContextImpl(new Configuration(conf), jobId);
return newContext;
}
Modified: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1326067&r1=1326066&r2=1326067&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Sat Apr 14 08:01:57 2012
@@ -466,13 +466,13 @@ public class JobControlCompiler{
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
- sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ sFunc.setStoreLocation(st.getSFile().getFileName(), new org.apache.hadoop.mapreduce.Job(nwJob.getConfiguration()));
}
for (POStore st: reduceStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
- sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ sFunc.setStoreLocation(st.getSFile().getFileName(), new org.apache.hadoop.mapreduce.Job(nwJob.getConfiguration()));
}
// the OutputFormat we report to Hadoop is always PigOutputFormat
Modified: pig/branches/branch-0.10/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1326067&r1=1326066&r2=1326067&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestMultiQueryLocal.java Sat Apr 14 08:01:57 2012
@@ -17,6 +17,8 @@
*/
package org.apache.pig.test;
+import java.io.BufferedReader;
+import java.io.FileReader;
import java.io.StringReader;
import java.io.IOException;
import java.io.File;
@@ -29,12 +31,16 @@ import java.util.Properties;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -335,7 +341,69 @@ public class TestMultiQueryLocal {
Assert.fail();
}
}
+
+ public static class PigStorageWithSuffix extends PigStorage {
+
+ private String suffix;
+ public PigStorageWithSuffix(String s) {
+ this.suffix = s;
+ }
+ static private final String key="test.key";
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ super.setStoreLocation(location, job);
+ if (job.getConfiguration().get(key)==null) {
+ job.getConfiguration().set(key, suffix);
+ }
+ suffix = job.getConfiguration().get(key);
+ }
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ try {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ for (Object obj : f.getAll()) {
+ t.append(obj);
+ }
+ t.append(suffix);
+ writer.write(null, t);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ // See PIG-2578
+ @Test
+ public void testMultiStoreWithConfig() {
+
+ System.out.println("===== test multi-query with competing config =====");
+
+ try {
+ myPig.setBatchOn();
+ myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+ myPig.registerQuery("store a into '/tmp/Pig-TestMultiQueryLocal1' using MultiStoreWithSuffix('a');");
+ myPig.registerQuery("store a into '/tmp/Pig-TestMultiQueryLocal2' using MultiStoreWithSuffix('b');");
+
+ myPig.executeBatch();
+ myPig.discardBatch();
+ BufferedReader reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal1/part-m-00000"));
+ String line;
+ while ((line = reader.readLine())!=null) {
+ Assert.assertTrue(line.endsWith("a"));
+ }
+ reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal2/part-m-00000"));
+ while ((line = reader.readLine())!=null) {
+ Assert.assertTrue(line.endsWith("b"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
@Test
public void testMultiQueryWithExplain() {