You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/24 00:10:36 UTC
svn commit: r915577 - in /hadoop/pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLay...
Author: pradeepkth
Date: Tue Feb 23 23:10:36 2010
New Revision: 915577
URL: http://svn.apache.org/viewvc?rev=915577&view=rev
Log:
PIG-1250: Make StoreFunc an abstract class and create a mirror interface called StoreFuncInterface (pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Feb 23 23:10:36 2010
@@ -22,6 +22,9 @@
INCOMPATIBLE CHANGES
+PIG-1250: Make StoreFunc an abstract class and create a mirror interface
+called StoreFuncInterface (pradeepkth)
+
PIG-1234: Unable to create input slice for har:// files (pradeepkth)
PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java Tue Feb 23 23:10:36 2010
@@ -69,7 +69,7 @@
* 1 reducer. So in the above case for e.g. there will be only 1 file each under
* 'a1' and 'a2' directories.
*/
-public class MultiStorage implements StoreFunc {
+public class MultiStorage extends StoreFunc {
private Path outputPath; // User specified output Path
private int splitFieldIndex = -1; // Index of the key field
@@ -141,11 +141,6 @@
}
}
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
-
- }
-
@SuppressWarnings("unchecked")
@Override
public OutputFormat getOutputFormat() throws IOException {
@@ -161,12 +156,6 @@
}
@Override
- public String relToAbsPathForStoreLocation(String location, Path curDir)
- throws IOException {
- return LoadFunc.getAbsolutePath(location, curDir);
- }
-
- @Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
@@ -179,11 +168,6 @@
}
}
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
-
- }
-
//--------------------------------------------------------------------------
// Implementation of OutputFormat
@@ -210,6 +194,7 @@
private ByteArrayOutputStream mOut =
new ByteArrayOutputStream(BUFFER_SIZE);
+ @Override
public void write(String key, Tuple val) throws IOException {
int sz = val.size();
for (int i = 0; i < sz; i++) {
@@ -232,6 +217,7 @@
mOut.reset();
}
+ @Override
public void close(TaskAttemptContext context) throws IOException {
for (MyLineRecordWriter out : storeMap.values()) {
out.close(context);
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Feb 23 23:10:36 2010
@@ -52,7 +52,7 @@
/**
* Pig LoadFunc implementation for Zebra Table
*/
-public class TableStorer implements StoreFunc, StoreMetadata {
+public class TableStorer extends StoreFunc implements StoreMetadata {
private static final String UDFCONTEXT_OUTPUT_SCHEMA = "zebra.UDFContext.outputSchema";
private static final String UDFCONTEXT_SORT_INFO = "zebra.UDFContext.sortInfo";
Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java Tue Feb 23 23:10:36 2010
@@ -24,17 +24,18 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
/**
-* This interface is used to implement functions to write records
+* This abstract class is used to implement functions to write records
* from a dataset.
*
*
*/
-public interface StoreFunc {
+public abstract class StoreFunc implements StoreFuncInterface {
/**
* This method is called by the Pig runtime in the front end to convert the
@@ -42,9 +43,7 @@
* StoreFunc implementation is free to choose how it converts a relative
* location to an absolute location since this may depend on what the location
* string represent (hdfs path or some other data source).
- * The static method {@link LoadFunc#getAbsolutePath} provides a default
- * implementation for hdfs and hadoop local file system and it can be used
- * to implement this method.
+ *
*
* @param location location as provided in the "store" statement of the script
* @param curDir the current working direction based on any "cd" statements
@@ -52,9 +51,13 @@
* in the script, this would be the home directory -
* <pre>/user/<username> </pre>
* @return the absolute location based on the arguments passed
+ * @throws IOException
* @throws IOException if the conversion is not possible
*/
- String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException;
+ public String relToAbsPathForStoreLocation(String location, Path curDir)
+ throws IOException {
+ return LoadFunc.getAbsolutePath(location, curDir);
+ }
/**
* Return the OutputFormat associated with StoreFunc. This will be called
@@ -65,7 +68,7 @@
* OutputFormat
*
*/
- OutputFormat getOutputFormat() throws IOException;
+ public abstract OutputFormat getOutputFormat() throws IOException;
/**
* Communicate to the store function the location used in Pig Latin to refer
@@ -83,7 +86,7 @@
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
*/
- void setStoreLocation(String location, Job job) throws IOException;
+ public abstract void setStoreLocation(String location, Job job) throws IOException;
/**
* Set the schema for data to be stored. This will be called on the
@@ -97,7 +100,9 @@
* @throws IOException if this schema is not acceptable. It should include
* a detailed error message indicating what is wrong with the schema.
*/
- void checkSchema(ResourceSchema s) throws IOException;
+ public void checkSchema(ResourceSchema s) throws IOException {
+ // default implementation is a no-op
+ }
/**
* Initialize StoreFunc to write data. This will be called during
@@ -105,7 +110,7 @@
* @param writer RecordWriter to use.
* @throws IOException if an exception occurs during initialization
*/
- void prepareToWrite(RecordWriter writer) throws IOException;
+ public abstract void prepareToWrite(RecordWriter writer) throws IOException;
/**
* Write a tuple the output stream to which this instance was
@@ -114,7 +119,7 @@
* @param t the tuple to store.
* @throws IOException if an exception occurs during the write
*/
- void putNext(Tuple t) throws IOException;
+ public abstract void putNext(Tuple t) throws IOException;
/**
* This method will be called by Pig both in the front end and back end to
@@ -123,5 +128,7 @@
* various method invocations in the front end and back end.
* @param signature a unique signature to identify this StoreFunc
*/
- public void setStoreFuncUDFContextSignature(String signature);
+ public void setStoreFuncUDFContextSignature(String signature) {
+ // default implementation is a no-op
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=915577&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFuncInterface.java Tue Feb 23 23:10:36 2010
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+
+/**
+* This interface is used to implement functions to write records
+* from a dataset.
+*
+*
+*/
+
+public interface StoreFuncInterface {
+
+ /**
+ * This method is called by the Pig runtime in the front end to convert the
+ * output location to an absolute path if the location is relative. The
+ * StoreFuncInterface implementation is free to choose how it converts a relative
+ * location to an absolute location since this may depend on what the location
+ * string represent (hdfs path or some other data source).
+ * The static method {@link LoadFunc#getAbsolutePath} provides a default
+ * implementation for hdfs and hadoop local file system and it can be used
+ * to implement this method.
+ *
+ * @param location location as provided in the "store" statement of the script
+ * @param curDir the current working direction based on any "cd" statements
+ * in the script before the "store" statement. If there are no "cd" statements
+ * in the script, this would be the home directory -
+ * <pre>/user/<username> </pre>
+ * @return the absolute location based on the arguments passed
+ * @throws IOException if the conversion is not possible
+ */
+ String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException;
+
+ /**
+ * Return the OutputFormat associated with StoreFuncInterface. This will be called
+ * on the front end during planning and not on the backend during
+ * execution.
+ * @return the {@link OutputFormat} associated with StoreFuncInterface
+ * @throws IOException if an exception occurs while constructing the
+ * OutputFormat
+ *
+ */
+ OutputFormat getOutputFormat() throws IOException;
+
+ /**
+ * Communicate to the store function the location used in Pig Latin to refer
+ * to the object(s) being stored. That is, if the PL script is
+ * <b>store A into 'bla'</b>
+ * then 'bla' is the location. This location should be either a file name
+ * or a URI. If it does not have a URI scheme Pig will assume it is a
+ * filename.
+ * This method will be called in the frontend and backend multiple times. Implementations
+ * should bear in mind that this method is called multiple times and should
+ * ensure there are no inconsistent side effects due to the multiple calls.
+ *
+
+ * @param location Location indicated in store statement.
+ * @param job The {@link Job} object
+ * @throws IOException if the location is not valid.
+ */
+ void setStoreLocation(String location, Job job) throws IOException;
+
+ /**
+ * Set the schema for data to be stored. This will be called on the
+ * front end during planning if the store is associated with a schema.
+ * A Store function should implement this function to
+ * check that a given schema is acceptable to it. For example, it
+ * can check that the correct partition keys are included;
+ * a storage function to be written directly to an OutputFormat can
+ * make sure the schema will translate in a well defined way.
+ * @param s to be checked
+ * @throws IOException if this schema is not acceptable. It should include
+ * a detailed error message indicating what is wrong with the schema.
+ */
+ void checkSchema(ResourceSchema s) throws IOException;
+
+ /**
+ * Initialize StoreFuncInterface to write data. This will be called during
+ * execution before the call to putNext.
+ * @param writer RecordWriter to use.
+ * @throws IOException if an exception occurs during initialization
+ */
+ void prepareToWrite(RecordWriter writer) throws IOException;
+
+ /**
+ * Write a tuple the output stream to which this instance was
+ * previously bound.
+ *
+ * @param t the tuple to store.
+ * @throws IOException if an exception occurs during the write
+ */
+ void putNext(Tuple t) throws IOException;
+
+ /**
+ * This method will be called by Pig both in the front end and back end to
+ * pass a unique signature to the {@link StoreFuncInterface} which it can use to store
+ * information in the {@link UDFContext} which it needs to store between
+ * various method invocations in the front end and back end.
+ * @param signature a unique signature to identify this StoreFuncInterface
+ */
+ public void setStoreFuncUDFContextSignature(String signature);
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb 23 23:10:36 2010
@@ -47,7 +47,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -449,7 +449,7 @@
for (POStore st: mapStores) {
storeLocations.add(st);
- StoreFunc sFunc = st.getStoreFunc();
+ StoreFuncInterface sFunc = st.getStoreFunc();
if (st.getSchema()!=null)
sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
@@ -457,7 +457,7 @@
for (POStore st: reduceStores) {
storeLocations.add(st);
- StoreFunc sFunc = st.getStoreFunc();
+ StoreFuncInterface sFunc = st.getStoreFunc();
if (st.getSchema()!=null)
sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
@@ -1070,7 +1070,8 @@
this.conf = conf;
}
- public void visitFRJoin(POFRJoin join) throws VisitorException {
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Tue Feb 23 23:10:36 2010
@@ -22,7 +22,7 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
/**
@@ -55,10 +55,10 @@
@SuppressWarnings("unchecked")
@Override
- public StoreFunc createStoreFunc(POStore store)
+ public StoreFuncInterface createStoreFunc(POStore store)
throws IOException {
- StoreFunc storeFunc = store.getStoreFunc();
+ StoreFuncInterface storeFunc = store.getStoreFunc();
// call the setStoreLocation on the storeFunc giving it the
// Job. Typically this will result in the OutputFormat of the
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Feb 23 23:10:36 2010
@@ -27,7 +27,7 @@
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -102,7 +102,7 @@
List<Pair<OutputCommitter, POStore>> committers =
new ArrayList<Pair<OutputCommitter,POStore>>();
for (POStore store : stores) {
- StoreFunc sFunc = store.getStoreFunc();
+ StoreFuncInterface sFunc = store.getStoreFunc();
TaskAttemptContext updatedContext = setUpContext(context, store);
try {
@@ -155,7 +155,7 @@
static void storeCleanup(POStore store, Configuration conf)
throws IOException {
- StoreFunc storeFunc = store.getStoreFunc();
+ StoreFuncInterface storeFunc = store.getStoreFunc();
if (storeFunc instanceof StoreMetadata) {
Schema schema = store.getSchema();
if (schema != null) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Tue Feb 23 23:10:36 2010
@@ -28,7 +28,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -75,7 +75,7 @@
} else {
store = reduceStores.get(0);
}
- StoreFunc sFunc = store.getStoreFunc();
+ StoreFuncInterface sFunc = store.getStoreFunc();
// set output location
PigOutputFormat.setLocation(taskattemptcontext, store);
// The above call should have update the conf in the JobContext
@@ -106,14 +106,14 @@
/**
* the StoreFunc for the single store
*/
- private StoreFunc sFunc;
+ private StoreFuncInterface sFunc;
/**
* Single Query or multi query
*/
private Mode mode;
- public PigRecordWriter(RecordWriter wrappedWriter, StoreFunc sFunc,
+ public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
Mode mode)
throws IOException {
this.mode = mode;
@@ -164,7 +164,7 @@
public static void setLocation(JobContext jobContext, POStore store) throws
IOException {
Job storeJob = new Job(jobContext.getConfiguration());
- StoreFunc storeFunc = store.getStoreFunc();
+ StoreFuncInterface storeFunc = store.getStoreFunc();
String outputLocation = store.getSFile().getFileName();
storeFunc.setStoreLocation(outputLocation, storeJob);
@@ -209,7 +209,7 @@
// set output location
PigOutputFormat.setLocation(jobContextCopy, store);
- StoreFunc sFunc = store.getStoreFunc();
+ StoreFuncInterface sFunc = store.getStoreFunc();
OutputFormat of = sFunc.getOutputFormat();
// The above call should have update the conf in the JobContext
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Tue Feb 23 23:10:36 2010
@@ -18,25 +18,23 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.SortInfo;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -51,7 +49,7 @@
private static final long serialVersionUID = 1L;
private static Result empty = new Result(POStatus.STATUS_NULL, null);
- transient private StoreFunc storer;
+ transient private StoreFuncInterface storer;
transient private final Log log = LogFactory.getLog(getClass());
transient private POStoreImpl impl;
private FileSpec sFile;
@@ -203,8 +201,8 @@
return schema;
}
- public StoreFunc getStoreFunc() {
- StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ public StoreFuncInterface getStoreFunc() {
+ StoreFuncInterface sFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
sFunc.setStoreFuncUDFContextSignature(signature);
return sFunc;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java Tue Feb 23 23:10:36 2010
@@ -19,10 +19,7 @@
import java.io.IOException;
-import org.apache.pig.SortInfo;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.StoreFuncInterface;
/**
* This class is used to specify the actual behavior of the store
@@ -34,7 +31,7 @@
* @param store - the POStore object
* @throws IOException
*/
- public abstract StoreFunc createStoreFunc(POStore store)
+ public abstract StoreFuncInterface createStoreFunc(POStore store)
throws IOException;
/**
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Tue Feb 23 23:10:36 2010
@@ -26,20 +26,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.util.Progressable;
-import org.apache.pig.PigException;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
@@ -53,40 +45,10 @@
public static final String FILE_SYSTEM_NAME = "fs.default.name";
/**
- * This method is to be called from an
- * {@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(
- * FileSystem, org.apache.hadoop.mapred.JobConf, String, Progressable)}
- * method to obtain a reference to the {@link org.apache.pig.StoreFunc} object to be used by
- * that OutputFormat to perform the write() operation
- * @param conf the JobConf object
- * @return the StoreFunc reference
- * @throws ExecException
- */
- public static StoreFunc getStoreFunc(Configuration conf) throws ExecException {
- StoreFunc store;
- try {
- String storeFunc = conf.get("pig.storeFunc", "");
- if (storeFunc.length() == 0) {
- store = new PigStorage();
- } else {
- storeFunc = (String) ObjectSerializer.deserialize(storeFunc);
- store = (StoreFunc) PigContext
- .instantiateFuncFromSpec(storeFunc);
- }
- } catch (Exception e) {
- int errCode = 2081;
- String msg = "Unable to setup the store function.";
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- return store;
- }
-
- /**
* Loads the key distribution sampler file
*
* @param keyDistFile the name for the distribution file
* @param totalReducers gets set to the total number of reducers as found in the dist file
- * @param job Ref to a jobCong object
* @param keyType Type of the key to be stored in the return map. It currently treats Tuple as a special case.
*/
@SuppressWarnings("unchecked")
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Feb 23 23:10:36 2010
@@ -46,7 +46,7 @@
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
@@ -65,7 +65,7 @@
import org.apache.pig.impl.util.LogUtils;
public class BinStorage extends FileInputLoadFunc
-implements LoadCaster, StoreFunc, LoadMetadata {
+implements LoadCaster, StoreFuncInterface, LoadMetadata {
public static final int RECORD_1 = 0x01;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Feb 23 23:10:36 2010
@@ -45,15 +45,15 @@
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;
/**
@@ -61,7 +61,7 @@
* delimiter is given as a regular expression. See String.split(delimiter) and
* http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information.
*/
-public class PigStorage extends FileInputLoadFunc implements StoreFunc,
+public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
LoadPushDown {
protected RecordReader in = null;
protected RecordWriter writer = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOStore.java Tue Feb 23 23:10:36 2010
@@ -21,7 +21,7 @@
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.PlanVisitor;
import org.apache.pig.impl.PigContext;
@@ -31,7 +31,7 @@
private static final long serialVersionUID = 2L;
private FileSpec output;
- transient private StoreFunc storeFunc;
+ transient private StoreFuncInterface storeFunc;
//private static Log log = LogFactory.getLog(LOStore.class);
@@ -45,7 +45,7 @@
output = outputFileSpec;
try {
- storeFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ storeFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate StoreFunc.", e);
}
@@ -55,7 +55,7 @@
return output;
}
- public StoreFunc getStoreFunc() {
+ public StoreFuncInterface getStoreFunc() {
return storeFunc;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigFile.java Tue Feb 23 23:10:36 2010
@@ -30,7 +30,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -71,7 +71,7 @@
Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
// create a simulated JobContext
JobContext jc = new JobContext(conf, new JobID());
- StoreFunc sfunc = (StoreFunc)PigContext.instantiateFuncFromSpec(
+ StoreFuncInterface sfunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(
storeFuncSpec);
OutputFormat<?,?> of = sfunc.getOutputFormat();
@@ -105,6 +105,7 @@
oc.cleanupJob(jc);
}
+ @Override
public String toString() {
return "PigFile: file: " + this.file + ", append: " + this.append;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Feb 23 23:10:36 2010
@@ -22,7 +22,7 @@
import java.util.List;
import org.apache.pig.FuncSpec;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
@@ -45,7 +45,7 @@
private String signature;
- transient private StoreFunc mStoreFunc;
+ transient private StoreFuncInterface mStoreFunc;
private static Log log = LogFactory.getLog(LOStore.class);
/**
@@ -68,7 +68,7 @@
// Also remove the commented out import org.apache.pig.impl.PigContext
try {
- mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ mStoreFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
this.mAlias = alias;
this.signature = constructSignature(mAlias, outputFileSpec.getFileName(), mOutputFile.getFuncSpec());
mStoreFunc.setStoreFuncUDFContextSignature(this.signature);
@@ -89,7 +89,7 @@
public void setOutputFile(FileSpec outputFileSpec) throws IOException {
try {
- mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
+ mStoreFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
} catch (Exception e) {
IOException ioe = new IOException(e.getMessage());
ioe.setStackTrace(e.getStackTrace());
@@ -98,7 +98,7 @@
mOutputFile = outputFileSpec;
}
- public StoreFunc getStoreFunc() {
+ public StoreFuncInterface getStoreFunc() {
return mStoreFunc;
}
@@ -124,6 +124,7 @@
return true;
}
+ @Override
public void visit(LOVisitor v) throws VisitorException {
v.visit(this);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Feb 23 23:10:36 2010
@@ -63,8 +63,8 @@
import org.apache.pig.EvalFunc;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
import org.apache.pig.FuncSpec;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -160,7 +160,7 @@
}
Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
- StoreFunc stoFunc = (StoreFunc)obj;
+ StoreFuncInterface stoFunc = (StoreFuncInterface)obj;
stoFunc.setStoreFuncUDFContextSignature(LOStore.constructSignature(alias, fileName, funcSpec));
try {
@@ -852,7 +852,7 @@
LoadFunc loadFunc = (LoadFunc) func;
break;
case FunctionType.STOREFUNC:
- StoreFunc storeFunc = (StoreFunc) func;
+ StoreFuncInterface storeFunc = (StoreFuncInterface) func;
break;
case FunctionType.PIGTOSTREAMFUNC:
PigToStream ptsFunc = (PigToStream) func;
@@ -2460,7 +2460,7 @@
}
Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
- StoreFunc stoFunc = (StoreFunc)obj;
+ StoreFuncInterface stoFunc = (StoreFuncInterface)obj;
stoFunc.setStoreFuncUDFContextSignature(LOStore.constructSignature(t.image, fileName, funcSpec));
// see the comments in LoadClause for reasons to cache absolutePath
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Tue Feb 23 23:10:36 2010
@@ -21,7 +21,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigException;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext ;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOVisitor;
@@ -63,7 +63,7 @@
@Override
protected void visit(LOStore store) throws PlanValidationException{
- StoreFunc sf = store.getStoreFunc();
+ StoreFuncInterface sf = store.getStoreFunc();
String outLoc = store.getOutputFile().getFileName();
Job dummyJob;
String errMsg = "Unexpected error. Could not validate the output " +
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Tue Feb 23 23:10:36 2010
@@ -24,21 +24,13 @@
import junit.framework.TestCase;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
-import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import org.apache.pig.builtin.ARITY;
import org.apache.pig.builtin.BagSize;
import org.apache.pig.builtin.CONCAT;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Feb 23 23:10:36 2010
@@ -2701,7 +2701,7 @@
}
}
- public static class DummyStoreWithOutputFormat implements StoreFunc {
+ public static class DummyStoreWithOutputFormat extends StoreFunc {
/**
*
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=915577&r1=915576&r2=915577&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Tue Feb 23 23:10:36 2010
@@ -401,7 +401,7 @@
}
}
- public static class DummyStore implements StoreFunc, StoreMetadata{
+ public static class DummyStore extends StoreFunc implements StoreMetadata{
@Override
public void checkSchema(ResourceSchema s) throws IOException {