You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/29 03:31:00 UTC
svn commit: r1028582 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
test/org/apache/pig/test/
Author: thejas
Date: Fri Oct 29 01:30:59 2010
New Revision: 1028582
URL: http://svn.apache.org/viewvc?rev=1028582&view=rev
Log:
PIG-1684: Inconsistent usage of store func.
Added:
pig/trunk/test/org/apache/pig/test/TestStoreInstances.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/StoreFunc.java
pig/trunk/src/org/apache/pig/StoreFuncInterface.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 29 01:30:59 2010
@@ -215,6 +215,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1684: Inconsistent usage of store func. (thejas)
+
PIG-1694: union-onschema projects null schema at parsing stage for some queries (thejas)
PIG-1685: Pig is unable to handle counters for glob paths ? (daijy)
Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Fri Oct 29 01:30:59 2010
@@ -63,7 +63,7 @@ public abstract class StoreFunc implemen
/**
* Return the OutputFormat associated with StoreFunc. This will be called
- * on the front end during planning and not on the backend during
+ * on the front end during planning and on the backend during
* execution.
* @return the {@link OutputFormat} associated with StoreFunc
* @throws IOException if an exception occurs while constructing the
Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Fri Oct 29 01:30:59 2010
@@ -60,7 +60,7 @@ public interface StoreFuncInterface {
/**
* Return the OutputFormat associated with StoreFuncInterface. This will be called
- * on the front end during planning and not on the backend during
+ * on the front end during planning and on the backend during
* execution.
* @return the {@link OutputFormat} associated with StoreFuncInterface
* @throws IOException if an exception occurs while constructing the
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Oct 29 01:30:59 2010
@@ -56,40 +56,29 @@ public class PigOutputCommitter extends
/**
* @param context
+ * @param list2
+ * @param list
* @throws IOException
*/
- public PigOutputCommitter(TaskAttemptContext context)
+ public PigOutputCommitter(TaskAttemptContext context,
+ List<POStore> mapStores, List<POStore> reduceStores)
throws IOException {
// create and store the map and reduce output committers
- mapOutputCommitters = getCommitters(context,
- JobControlCompiler.PIG_MAP_STORES);
- reduceOutputCommitters = getCommitters(context,
- JobControlCompiler.PIG_REDUCE_STORES);
+ mapOutputCommitters = getCommitters(context, mapStores);
+ reduceOutputCommitters = getCommitters(context, reduceStores);
}
/**
* @param conf
- * @param storeLookupKey
+ * @param mapStores
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
private List<Pair<OutputCommitter, POStore>> getCommitters(
TaskAttemptContext context,
- String storeLookupKey) throws IOException {
- Configuration conf = context.getConfiguration();
-
- // if there is a udf in the plan we would need to know the import
- // path so we can instantiate the udf. This is required because
- // we will be deserializing the POStores out of the plan in the next
- // line below. The POStore inturn has a member reference to the Physical
- // plan it is part of - so the deserialization goes deep and while
- // deserializing the plan, the udf.import.list may be needed.
- PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
- deserialize(conf.get("udf.import.list")));
- LinkedList<POStore> stores = (LinkedList<POStore>) ObjectSerializer.
- deserialize(conf.get(storeLookupKey));
+ List<POStore> stores) throws IOException {
List<Pair<OutputCommitter, POStore>> committers =
new ArrayList<Pair<OutputCommitter,POStore>>();
for (POStore store : stores) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Oct 29 01:30:59 2010
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
/**
@@ -56,17 +58,15 @@ public class PigOutputFormat extends Out
/** the relative path that can be used to build a temporary
* place to store the output from a number of map-reduce tasks*/
public static final String PIG_TMP_PATH = "pig.tmp.path";
-
+
+ List<POStore> reduceStores = null;
+ List<POStore> mapStores = null;
+ Configuration currentConf = null;
+
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
- // Setup UDFContext so StoreFunc can make use of it
- MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
- List<POStore> mapStores = getStores(taskattemptcontext,
- JobControlCompiler.PIG_MAP_STORES);
- List<POStore> reduceStores = getStores(taskattemptcontext,
- JobControlCompiler.PIG_REDUCE_STORES);
-
+ setupUdfEnvAndStores(taskattemptcontext);
if(mapStores.size() + reduceStores.size() == 1) {
// single store case
POStore store;
@@ -182,13 +182,8 @@ public class PigOutputFormat extends Out
@Override
public void checkOutputSpecs(JobContext jobcontext) throws IOException, InterruptedException {
- // Setup UDFContext so in StoreFunc can make use of it
- MapRedUtil.setupUDFContext(jobcontext.getConfiguration());
- List<POStore> mapStores = getStores(jobcontext,
- JobControlCompiler.PIG_MAP_STORES);
+ setupUdfEnvAndStores(jobcontext);
checkOutputSpecsHelper(mapStores, jobcontext);
- List<POStore> reduceStores = getStores(jobcontext,
- JobControlCompiler.PIG_REDUCE_STORES);
checkOutputSpecsHelper(reduceStores, jobcontext);
}
@@ -212,26 +207,76 @@ public class PigOutputFormat extends Out
}
}
/**
- * @param jobcontext
+ * @param currentConf2
* @param storeLookupKey
* @return
* @throws IOException
*/
- private List<POStore> getStores(JobContext jobcontext, String storeLookupKey)
+ private List<POStore> getStores(Configuration conf, String storeLookupKey)
throws IOException {
- Configuration conf = jobcontext.getConfiguration();
return (List<POStore>)ObjectSerializer.deserialize(
conf.get(storeLookupKey));
}
+
+ private void setupUdfEnvAndStores(JobContext jobcontext)
+ throws IOException{
+ Configuration newConf = jobcontext.getConfiguration();
+
+ // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
+ // construct of PigOutputCommitter, can make use of it
+ MapRedUtil.setupUDFContext(newConf);
+
+ // if there is a udf in the plan we would need to know the import
+ // path so we can instantiate the udf. This is required because
+ // we will be deserializing the POStores out of the plan in the next
+ // line below. The POStore inturn has a member reference to the Physical
+ // plan it is part of - so the deserialization goes deep and while
+ // deserializing the plan, the udf.import.list may be needed.
+ if(! isConfPropEqual("udf.import.list", currentConf, newConf)){
+ PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
+ deserialize(newConf.get("udf.import.list")));
+ }
+ if(! isConfPropEqual(JobControlCompiler.PIG_MAP_STORES, currentConf, newConf)){
+ mapStores = getStores(newConf, JobControlCompiler.PIG_MAP_STORES);
+ }
+ if(! isConfPropEqual(JobControlCompiler.PIG_REDUCE_STORES, currentConf, newConf)){
+ reduceStores = getStores(newConf, JobControlCompiler.PIG_REDUCE_STORES);
+ }
+ //keep a copy of the config, so some steps don't need to be taken unless
+ // config properties have changed (eg. creating stores).
+ currentConf = new Configuration(newConf);
+ }
+
+ /**
+ * Check if given property prop is same in configurations conf1, conf2
+ * @param prop
+ * @param conf1
+ * @param conf2
+ * @return true if both are equal
+ */
+ private boolean isConfPropEqual(String prop, Configuration conf1,
+ Configuration conf2) {
+ if( (conf1 == null || conf2 == null) && (conf1 != conf2) ){
+ return false;
+ }
+ String str1 = conf1.get(prop);
+ String str2 = conf2.get(prop);
+ if( (str1 == null || str2 == null) && (str1 != str2) ){
+ return false;
+ }
+ return str1.equals(str2);
+ }
+
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext
taskattemptcontext) throws IOException, InterruptedException {
- // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
- // construct of PigOutputCommitter, can make use of it
- MapRedUtil.setupUDFContext(taskattemptcontext.getConfiguration());
+ setupUdfEnvAndStores(taskattemptcontext);
+
// we return an instance of PigOutputCommitter to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
- return new PigOutputCommitter(taskattemptcontext);
+ return new PigOutputCommitter(taskattemptcontext,
+ mapStores,
+ reduceStores);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1028582&r1=1028581&r2=1028582&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri Oct 29 01:30:59 2010
@@ -212,10 +212,13 @@ public class POStore extends PhysicalOpe
return schema;
}
+
public StoreFuncInterface getStoreFunc() {
- StoreFuncInterface sFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
- sFunc.setStoreFuncUDFContextSignature(signature);
- return sFunc;
+ if(storer == null){
+ storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ storer.setStoreFuncUDFContextSignature(signature);
+ }
+ return storer;
}
/**
Added: pig/trunk/test/org/apache/pig/test/TestStoreInstances.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreInstances.java?rev=1028582&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreInstances.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreInstances.java Fri Oct 29 01:30:59 2010
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Test to ensure that same instance of store func is used for multiple
+ * backend tasks. This enables sharing of information between putNext and
+ * output committer
+ *
+ */
+public class TestStoreInstances {
+ static MiniCluster cluster ;
+ private static final String INP_FILE_2NUMS = "TestStoreInstances";
+
+ @Before
+ public void setUp() throws Exception {
+ FileLocalizer.setInitialized(false);
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @BeforeClass
+ public static void oneTimeSetup() throws IOException, Exception {
+ cluster = MiniCluster.buildCluster();
+
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ Util.createInputFile(cluster, INP_FILE_2NUMS, input);
+ Util.createLocalInputFile(INP_FILE_2NUMS, input);
+ }
+
+ private static final String CHECK_INSTANCE_STORE_FUNC
+ = "org.apache.pig.test.TestStoreInstances\\$STFuncCheckInstances";
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ new File(INP_FILE_2NUMS).delete();
+ cluster.shutDown();
+ }
+
+ /**
+ * Test that putnext is able to communicate to outputcommitter
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testBackendStoreCommunication() throws IOException, ParseException {
+ ExecType[] execTypes = { ExecType.MAPREDUCE, ExecType.LOCAL};
+ PigServer pig = null;
+ for(ExecType execType : execTypes){
+ System.err.println("Starting test mode " + execType);
+ if(execType == ExecType.MAPREDUCE) {
+ pig = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ }else{
+ pig = new PigServer(ExecType.LOCAL);
+ }
+ final String outFile = "TestStoreInst1";
+ Util.deleteFile(pig.getPigContext(), outFile);
+ pig.setBatchOn();
+ String query =
+ " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" +
+ " store l1 into '" + outFile + "' using " + CHECK_INSTANCE_STORE_FUNC +
+ ";";
+ Util.registerMultiLineQuery(pig, query);
+ List<ExecJob> execJobs = pig.executeBatch();
+ assertEquals("num jobs", 1, execJobs.size());
+ assertEquals("status ", JOB_STATUS.COMPLETED, execJobs.get(0).getStatus());
+ }
+
+ }
+
+
+ /**
+ * Store func that records output rows in a variable
+ */
+ public static class STFuncCheckInstances extends StoreFunc {
+
+ private ArrayList<Tuple> outRows;
+
+ public STFuncCheckInstances(){
+ super();
+ this.outRows = new ArrayList<Tuple>();
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new OutFormatCheckInstances(outRows);
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ outRows.add(t);
+
+ }
+
+
+ @Override
+ public void setStoreLocation(String location, Job job)
+ throws IOException {
+ Configuration conf = job.getConfiguration();
+ conf.set("mapred.output.dir", location);
+
+ }
+
+
+ }
+
+ /**
+ * OutputFormat class for the store func
+ */
+ public static class OutFormatCheckInstances extends TextOutputFormat {
+
+ private ArrayList<Tuple> outRows;
+
+ public OutFormatCheckInstances(ArrayList<Tuple> outRows) {
+ super();
+ this.outRows = outRows;
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
+ throws IOException {
+ return new OutputCommitterTestInstances(outRows, arg0);
+ }
+
+ }
+
+ /**
+ * OutputCommitter class that checks number of rows written by store func
+ */
+ public static class OutputCommitterTestInstances extends FileOutputCommitter {
+
+
+ private ArrayList<Tuple> outRows;
+
+ public OutputCommitterTestInstances(ArrayList<Tuple> outRows,
+ TaskAttemptContext taskAttemptCtx) throws IOException {
+ super(new Path(taskAttemptCtx.getConfiguration().get("mapred.output.dir")), taskAttemptCtx);
+ this.outRows = outRows;
+ }
+
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) {
+ System.err.println("OutputCommitterTestInstances commitTask called");
+ assertTrue("Number of output rows > 0 ",outRows.size() > 0);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0)
+ throws IOException {
+ return true;
+ }
+
+
+ }
+
+
+}