You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/09/11 00:19:07 UTC
svn commit: r1383134 - in /pig/trunk: CHANGES.txt
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
test/org/apache/pig/test/TestMultiQueryLocal.java
Author: daijy
Date: Mon Sep 10 22:19:07 2012
New Revision: 1383134
URL: http://svn.apache.org/viewvc?rev=1383134&view=rev
Log:
PIG-2912: Pig should clone JobConf while creating JobContextImpl and TaskAttemptContextImpl in Hadoop23
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1383134&r1=1383133&r2=1383134&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 10 22:19:07 2012
@@ -741,6 +741,8 @@ Release 0.9.3 - Unreleased
BUG FIXES
+PIG-2912: Pig should clone JobConf while creating JobContextImpl and TaskAttemptContextImpl in Hadoop23 (rohini via daijy)
+
PIG-2775: Register jar does not goes to classpath in some cases (daijy)
PIG-2693: LoadFunc.setLocation should be called before LoadMetadata.getStatistics (billgraham via julien)
Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1383134&r1=1383133&r2=1383134&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Mon Sep 10 22:19:07 2012
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.ContextFactory;
@@ -37,20 +38,27 @@ import org.apache.pig.impl.PigContext;
public class HadoopShims {
static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
- JobContext newContext = ContextFactory.cloneContext(original, original.getConfiguration());
+ JobContext newContext = ContextFactory.cloneContext(original,
+ new JobConf(original.getConfiguration()));
return newContext;
}
static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
TaskAttemptID taskId) {
- TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId);
- return newContext;
+ if (conf instanceof JobConf) {
+ return new TaskAttemptContextImpl(new JobConf(conf), taskId);
+ } else {
+ return new TaskAttemptContextImpl(conf, taskId);
+ }
}
static public JobContext createJobContext(Configuration conf,
JobID jobId) {
- JobContext newContext = new JobContextImpl(conf, jobId);
- return newContext;
+ if (conf instanceof JobConf) {
+ return new JobContextImpl(new JobConf(conf), jobId);
+ } else {
+ return new JobContextImpl(conf, jobId);
+ }
}
static public boolean isMap(TaskAttemptID taskAttemptID) {
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1383134&r1=1383133&r2=1383134&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon Sep 10 22:19:07 2012
@@ -17,6 +17,9 @@
*/
package org.apache.pig.test;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.FileReader;
import java.io.StringReader;
import java.io.IOException;
import java.io.File;
@@ -27,14 +30,20 @@ import java.util.Collections;
import java.util.Properties;
import junit.framework.Assert;
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -336,6 +345,89 @@ public class TestMultiQueryLocal {
}
}
+ public static class PigStorageWithConfig extends PigStorage {
+
+ private static final String key = "test.key";
+ private String suffix;
+
+ public PigStorageWithConfig(String s) {
+ this.suffix = s;
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ super.setStoreLocation(location, job);
+ Assert.assertNull(job.getConfiguration().get(key));
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() {
+ return new PigTextOutputFormatWithConfig();
+ }
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ try {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ for (Object obj : f.getAll()) {
+ t.append(obj);
+ }
+ t.append(suffix);
+ writer.write(null, t);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static class PigTextOutputFormatWithConfig extends PigTextOutputFormat {
+
+ public PigTextOutputFormatWithConfig() {
+ super((byte) '\t');
+ }
+
+ @Override
+ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException {
+ context.getConfiguration().set(PigStorageWithConfig.key, "mapred.work.output.dir");
+ return super.getOutputCommitter(context);
+ }
+ }
+
+ // See PIG-2912
+ @Test
+ public void testMultiStoreWithConfig() {
+
+ System.out.println("===== test multi-query with competing config =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid > 5;");
+ myPig.registerQuery("store b into '/tmp/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('a');");
+ myPig.registerQuery("store c into '/tmp/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('b');");
+
+ myPig.executeBatch();
+ myPig.discardBatch();
+ BufferedReader reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal1/part-m-00000"));
+ String line;
+ while ((line = reader.readLine())!=null) {
+ Assert.assertTrue(line.endsWith("a"));
+ }
+ reader = new BufferedReader(new FileReader("/tmp/Pig-TestMultiQueryLocal2/part-m-00000"));
+ while ((line = reader.readLine())!=null) {
+ Assert.assertTrue(line.endsWith("b"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
@Test
public void testMultiQueryWithExplain() {