You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/17 17:47:13 UTC
svn commit: r649154 - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/local/executionengine/
src/org/apache/pig/builtin/ test/org/apache/pig/test/
Author: gates
Date: Thu Apr 17 08:47:08 2008
New Revision: 649154
URL: http://svn.apache.org/viewvc?rev=649154&view=rev
Log:
PIG-114: store one alias/logicalPlan twice leads to instantiation of StoreFunc as LoadFunc.
Added:
incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java
incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Apr 17 08:47:08 2008
@@ -237,3 +237,6 @@
PIG-183: Catch when a UDF has been compiled with the wrong version of
java and give a RuntimeException (pi_song via gates).
+
+ PIG-114: store one alias/logicalPlan twice leads to instantiation of
+ StoreFunc as LoadFunc (pi_song via gates).
Added: incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java?rev=649154&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java Thu Apr 17 08:47:08 2008
@@ -0,0 +1,14 @@
+package org.apache.pig;
+
+/**
+ * This interface is used to implement classes that can perform both
+ * Load and Store functionalities in a symmetric fashion (thus reversible).
+ *
+ * The symmetry property of implementations is used in the optimization
+ * engine therefore violation of this property while implementing this
+ * interface is likely to result in unexpected output from executions.
+ *
+ */
+public interface ReversibleLoadStoreFunc extends LoadFunc, StoreFunc {
+
+}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu Apr 17 08:47:08 2008
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Iterator;
+import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.FunctionInstantiator;
@@ -83,16 +84,18 @@
MapRedResult materializedResult = materializedResults.get(logicalKey);
if (materializedResult != null) {
- POMapreduce pom = new POMapreduce(logicalKey.getScope(),
- nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
- execEngine.getPhysicalOpTable(),
- logicalKey,
- pigContext);
+ if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec())
+ instanceof ReversibleLoadStoreFunc) {
+ POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ execEngine.getPhysicalOpTable(), logicalKey,
+ pigContext);
- pom.addInputFile(materializedResult.outFileSpec);
- pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
+ pom.addInputFile(materializedResult.outFileSpec);
+ pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
- return pom.getOperatorKey();
+ return pom.getOperatorKey();
+ }
}
// first, compile inputs into MapReduce operators
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Apr 17 08:47:08 2008
@@ -6,6 +6,7 @@
import java.util.HashMap;
import java.util.HashSet;
+import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.impl.PigContext;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
@@ -161,16 +162,20 @@
LocalResult materializedResult = materializedResults.get(logicalKey);
if (materializedResult != null) {
- ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
- nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
- physicalOpTable,
- pigContext,
- materializedResult.outFileSpec,
- LogicalOperator.FIXED);
- OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
-
- return ppKey;
+ if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec())
+ instanceof ReversibleLoadStoreFunc) {
+ ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ physicalOpTable,
+ pigContext,
+ materializedResult.outFileSpec,
+ LogicalOperator.FIXED);
+
+ OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
+ return ppKey;
+ }
+
}
OperatorKey physicalKey = new OperatorKey();
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Thu Apr 17 08:47:08 2008
@@ -24,13 +24,11 @@
import java.io.OutputStream;
import java.util.Iterator;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
-
-public class BinStorage implements LoadFunc, StoreFunc {
+public class BinStorage implements ReversibleLoadStoreFunc {
Iterator<Tuple> i = null;
protected BufferedPositionedInputStream in = null;
private DataInputStream inData = null;
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Apr 17 08:47:08 2008
@@ -21,8 +21,7 @@
import java.io.OutputStream;
import java.nio.charset.Charset;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -32,7 +31,7 @@
* delimiter is given as a regular expression. See String.split(delimiter) and
* http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information.
*/
-public class PigStorage implements LoadFunc, StoreFunc {
+public class PigStorage implements ReversibleLoadStoreFunc {
protected BufferedPositionedInputStream in = null;
long end = Long.MAX_VALUE;
private byte recordDel = (byte)'\n';
Added: incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java?rev=649154&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java Thu Apr 17 08:47:08 2008
@@ -0,0 +1,241 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+public class TestReversibleLoadStore extends TestCase {
+
+ static List<Tuple> _storedTuples = new ArrayList<Tuple>();
+
+ public void testLocalNoReuse() throws Exception {
+ runNoReuseTest(ExecType.LOCAL) ;
+ }
+
+ public void testMapReduceNoReuse() throws Exception {
+ runNoReuseTest(ExecType.MAPREDUCE) ;
+ }
+
+ public void testLocalReuse() throws Exception {
+ runReuseTest(ExecType.LOCAL) ;
+ }
+
+ public void testMapReduceReuse() throws Exception {
+ runReuseTest(ExecType.MAPREDUCE) ;
+ }
+
+ public void runNoReuseTest(ExecType runType) throws Exception {
+
+ DummyLoadFunc.readCounterMap = null ;
+ DummyStoreFunc.writeCounter = 0 ;
+
+ File tmpFile = createTempFile() ;
+
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING "
+ + DummyLoadFunc.class.getName() + "();");
+
+ String file1 = "/tmp/testPigOutput" ;
+ if (pig.existsFile(file1)) {
+ pig.deleteFile(file1) ;
+ }
+
+ pig.store("A", file1, DummyStoreFunc.class.getName() + "()");
+
+ String file2 = "/tmp/testPigOutput2" ;
+ if (pig.existsFile(file2)) {
+ pig.deleteFile(file2) ;
+ }
+ pig.store("A", file2, DummyStoreFunc.class.getName() + "()");
+
+ // for this test the plan will not be reused so:-
+ // - initial temp file has to be read 10 times
+ // - DummyLoadStoreFunc has to be written 10 times
+
+ assertEquals(10, DummyLoadFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
+ assertEquals(10, DummyStoreFunc.writeCounter) ;
+
+ pig.deleteFile(file1) ;
+ pig.deleteFile(file2) ;
+
+ }
+
+ public void runReuseTest(ExecType runType) throws Exception {
+
+ DummyLoadStoreFunc.readCounterMap = null ;
+ DummyLoadStoreFunc.writeCounter = 0 ;
+
+ File tmpFile = createTempFile() ;
+
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING "
+ + DummyLoadStoreFunc.class.getName() + "();");
+
+ String file1 = "/tmp/testPigOutput" ;
+ if (pig.existsFile(file1)) {
+ pig.deleteFile(file1) ;
+ }
+
+ pig.store("A", file1, DummyLoadStoreFunc.class.getName() + "()");
+
+ String file2 = "/tmp/testPigOutput2" ;
+ if (pig.existsFile(file2)) {
+ pig.deleteFile(file2) ;
+ }
+ pig.store("A", file2, DummyLoadStoreFunc.class.getName() + "()");
+
+ // for this test the plan will be reused so:-
+ // - initial temp file has to be read 5 times
+ // - the output of the first execution has to be read 5 times
+ // - DummyLoadStoreFunc has to be written 10 times
+
+ assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
+ assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("/tmp/testPigOutput").intValue()) ;
+ assertEquals(10, DummyLoadStoreFunc.writeCounter) ;
+
+
+ pig.deleteFile(file1) ;
+ pig.deleteFile(file2) ;
+
+ }
+
+ private File createTempFile() throws Exception {
+ File tmpFile = File.createTempFile("test", ".txt");
+ if (tmpFile.exists()) {
+ tmpFile.delete() ;
+ }
+ PrintWriter pw = new PrintWriter(tmpFile) ;
+ pw.println("1,11,111,1111") ;
+ pw.println("2,22,222,2222") ;
+ pw.println("3,33,333,3333") ;
+ pw.println("4,4,444,4444") ;
+ pw.println("5,55,555,5555") ;
+ pw.close() ;
+ tmpFile.deleteOnExit() ;
+ return tmpFile ;
+ }
+
+ public static class DummyLoadStoreFunc implements ReversibleLoadStoreFunc {
+
+ public static Map<String,Integer> readCounterMap = null ;
+
+ protected BufferedPositionedInputStream in = null;
+ private String fileName = null ;
+
+ public void bindTo(String inputfileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ in = is ;
+ fileName = inputfileName ;
+ }
+
+ public Tuple getNext() throws IOException {
+ String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
+ if (line == null) {
+ return null ;
+ }
+ // else
+
+ if (readCounterMap == null) {
+ readCounterMap = new HashMap<String,Integer>() ;
+ }
+
+ if (readCounterMap.get(fileName) == null) {
+ readCounterMap.put(fileName, 1) ;
+ }
+ else {
+ readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
+ }
+
+ return new Tuple(line, ",");
+ }
+
+ public static int writeCounter = 0 ;
+ private PrintWriter pw = null ;
+
+ public void bindTo(OutputStream os) throws IOException {
+ pw = new PrintWriter(os) ;
+ }
+
+ public void finish() throws IOException {
+ pw.close() ;
+ }
+
+ public void putNext(Tuple tuple) throws IOException {
+ writeCounter++ ;
+ pw.println(tuple.toDelimitedString(","));
+ }
+
+ }
+
+ public static class DummyLoadFunc implements LoadFunc {
+
+ public static Map<String,Integer> readCounterMap = null ;
+
+ protected BufferedPositionedInputStream in = null;
+ private String fileName = null ;
+
+ public void bindTo(String inputfileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ in = is ;
+ fileName = inputfileName ;
+ }
+
+ public Tuple getNext() throws IOException {
+ String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
+ if (line == null) {
+ return null ;
+ }
+ // else
+
+ if (readCounterMap == null) {
+ readCounterMap = new HashMap<String,Integer>() ;
+ }
+
+ if (readCounterMap.get(fileName) == null) {
+ readCounterMap.put(fileName, 1) ;
+ }
+ else {
+ readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
+ }
+
+ return new Tuple(line, ",");
+ }
+
+ }
+
+ public static class DummyStoreFunc implements StoreFunc {
+
+ public static int writeCounter = 0 ;
+ private PrintWriter pw = null ;
+
+ public void bindTo(OutputStream os) throws IOException {
+ pw = new PrintWriter(os) ;
+ }
+
+ public void finish() throws IOException {
+ pw.close() ;
+ }
+
+ public void putNext(Tuple tuple) throws IOException {
+ writeCounter++ ;
+ pw.println(tuple.toDelimitedString(","));
+ }
+ }
+
+}