You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/17 17:47:13 UTC

svn commit: r649154 - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/builtin/ test/org/apache/pig/test/

Author: gates
Date: Thu Apr 17 08:47:08 2008
New Revision: 649154

URL: http://svn.apache.org/viewvc?rev=649154&view=rev
Log:
PIG-114: store one alias/logicalPlan twice leads to instantiation of StoreFunc as LoadFunc.


Added:
    incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java
    incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Apr 17 08:47:08 2008
@@ -237,3 +237,6 @@
 
 	PIG-183:  Catch when a UDF has been compiled with the wrong version of
 	java and give a RuntimeException (pi_song via gates).
+
+	PIG-114: store one alias/logicalPlan twice leads to instantiation of
+	StoreFunc as LoadFunc (pi_song via gates).

Added: incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java?rev=649154&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java Thu Apr 17 08:47:08 2008
@@ -0,0 +1,14 @@
+package org.apache.pig;
+
+/**
+ * This interface is used to implement classes that can perform both
+ * Load and Store functionalities in a symmetric fashion (thus reversible). 
+ * 
+ * The symmetry property of implementations is used in the optimization
+ * engine therefore violation of this property while implementing this 
+ * interface is likely to result in unexpected output from executions.
+ * 
+ */
+public interface ReversibleLoadStoreFunc extends LoadFunc, StoreFunc {
+
+}

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu Apr 17 08:47:08 2008
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Iterator;
 
+import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.FunctionInstantiator;
@@ -83,16 +84,18 @@
         MapRedResult materializedResult = materializedResults.get(logicalKey);
         
         if (materializedResult != null) {
-            POMapreduce pom = new POMapreduce(logicalKey.getScope(),
-                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
-                                              execEngine.getPhysicalOpTable(),
-                                              logicalKey,
-                                              pigContext);
+            if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) 
+                    instanceof ReversibleLoadStoreFunc) {
+                POMapreduce pom = new POMapreduce(logicalKey.getScope(),
+                    nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                    execEngine.getPhysicalOpTable(), logicalKey,
+                    pigContext);
 
-            pom.addInputFile(materializedResult.outFileSpec);
-            pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
+            	pom.addInputFile(materializedResult.outFileSpec);
+            	pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
 
-            return pom.getOperatorKey();            
+            	return pom.getOperatorKey();            
+			}
         }
         
         // first, compile inputs into MapReduce operators

Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Apr 17 08:47:08 2008
@@ -6,6 +6,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
+import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.BagFactory;
@@ -161,16 +162,20 @@
         LocalResult materializedResult = materializedResults.get(logicalKey);
         
         if (materializedResult != null) {
-            ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
-                                             nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
-                                             physicalOpTable,
-                                             pigContext, 
-                                             materializedResult.outFileSpec,
-                                             LogicalOperator.FIXED);
             
-            OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
-            
-            return ppKey;
+            if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) 
+                                                            instanceof ReversibleLoadStoreFunc) {
+                ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
+                            nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                            physicalOpTable,
+                            pigContext, 
+                            materializedResult.outFileSpec,
+                            LogicalOperator.FIXED);
+
+               OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
+               return ppKey;          
+            }
+
         }
 
         OperatorKey physicalKey = new OperatorKey();

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Thu Apr 17 08:47:08 2008
@@ -24,13 +24,11 @@
 import java.io.OutputStream;
 import java.util.Iterator;
 
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
-
-public class BinStorage implements LoadFunc, StoreFunc {
+public class BinStorage implements ReversibleLoadStoreFunc {
     Iterator<Tuple>     i              = null;
     protected BufferedPositionedInputStream in = null;
     private DataInputStream inData = null;

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=649154&r1=649153&r2=649154&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Apr 17 08:47:08 2008
@@ -21,8 +21,7 @@
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
@@ -32,7 +31,7 @@
  * delimiter is given as a regular expression. See String.split(delimiter) and
  * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information.
  */
-public class PigStorage implements LoadFunc, StoreFunc {
+public class PigStorage implements ReversibleLoadStoreFunc {
     protected BufferedPositionedInputStream in = null;
     long                end            = Long.MAX_VALUE;
     private byte recordDel = (byte)'\n';

Added: incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java?rev=649154&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java Thu Apr 17 08:47:08 2008
@@ -0,0 +1,241 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map; 
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+public class TestReversibleLoadStore extends TestCase {
+
+    static List<Tuple> _storedTuples = new ArrayList<Tuple>();
+
+    public void testLocalNoReuse() throws Exception {
+        runNoReuseTest(ExecType.LOCAL) ;
+    }
+    
+    public void testMapReduceNoReuse() throws Exception {
+        runNoReuseTest(ExecType.MAPREDUCE) ;
+    }
+    
+    public void testLocalReuse() throws Exception {
+        runReuseTest(ExecType.LOCAL) ;
+    }
+    
+    public void testMapReduceReuse() throws Exception {
+        runReuseTest(ExecType.MAPREDUCE) ;
+    }
+    
+    public void runNoReuseTest(ExecType runType) throws Exception {
+        
+        DummyLoadFunc.readCounterMap = null ;
+        DummyStoreFunc.writeCounter = 0 ;     
+        
+        File tmpFile = createTempFile() ;
+        
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING "
+                        + DummyLoadFunc.class.getName() + "();");
+
+        String file1 = "/tmp/testPigOutput" ;
+        if (pig.existsFile(file1)) {
+            pig.deleteFile(file1) ;
+        }
+        
+        pig.store("A", file1, DummyStoreFunc.class.getName() + "()");
+        
+        String file2 = "/tmp/testPigOutput2" ;
+        if (pig.existsFile(file2)) {
+            pig.deleteFile(file2) ;
+        }
+        pig.store("A", file2, DummyStoreFunc.class.getName() + "()");
+        
+        // for this test the plan will not be reused so:-
+        // - initial temp file has to be read 10 times 
+        // - DummyLoadStoreFunc has to be written 10 times
+        
+        assertEquals(10, DummyLoadFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
+        assertEquals(10, DummyStoreFunc.writeCounter) ;
+        
+        pig.deleteFile(file1) ;
+        pig.deleteFile(file2) ;
+        
+    }
+    
+    public void runReuseTest(ExecType runType) throws Exception {
+        
+        DummyLoadStoreFunc.readCounterMap = null ;
+        DummyLoadStoreFunc.writeCounter = 0 ;     
+        
+        File tmpFile = createTempFile() ;
+        
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING "
+                        + DummyLoadStoreFunc.class.getName() + "();");
+
+        String file1 = "/tmp/testPigOutput" ;
+        if (pig.existsFile(file1)) {
+            pig.deleteFile(file1) ;
+        }
+        
+        pig.store("A", file1, DummyLoadStoreFunc.class.getName() + "()");
+        
+        String file2 = "/tmp/testPigOutput2" ;
+        if (pig.existsFile(file2)) {
+            pig.deleteFile(file2) ;
+        }
+        pig.store("A", file2, DummyLoadStoreFunc.class.getName() + "()");
+        
+        // for this test the plan will be reused so:-
+        // - initial temp file has to be read 5 times 
+        // - the output of the first execution has to be read 5 times
+        // - DummyLoadStoreFunc has to be written 10 times
+        
+        assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
+        assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("/tmp/testPigOutput").intValue()) ;
+        assertEquals(10, DummyLoadStoreFunc.writeCounter) ;
+        
+        
+        pig.deleteFile(file1) ;
+        pig.deleteFile(file2) ;
+        
+    }
+    
+    private File createTempFile() throws Exception {
+        File tmpFile =  File.createTempFile("test", ".txt");
+        if (tmpFile.exists()) {
+            tmpFile.delete() ;
+        }
+        PrintWriter pw = new PrintWriter(tmpFile) ;
+        pw.println("1,11,111,1111") ;
+        pw.println("2,22,222,2222") ;
+        pw.println("3,33,333,3333") ;
+        pw.println("4,4,444,4444") ;
+        pw.println("5,55,555,5555") ;
+        pw.close() ;
+        tmpFile.deleteOnExit() ;
+        return tmpFile ;
+    }
+    
+    public static class DummyLoadStoreFunc implements ReversibleLoadStoreFunc {
+        
+       public static Map<String,Integer> readCounterMap = null ;
+        
+        protected BufferedPositionedInputStream in = null;
+        private String fileName = null ;
+        
+        public void bindTo(String inputfileName, BufferedPositionedInputStream is,
+                long offset, long end) throws IOException {
+            in = is ;
+            fileName = inputfileName ;
+        }
+
+        public Tuple getNext() throws IOException {
+            String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
+            if (line == null) {
+                return null ;
+            }
+            // else
+            
+            if (readCounterMap == null) {
+                readCounterMap = new HashMap<String,Integer>() ;
+            }
+            
+            if (readCounterMap.get(fileName) == null) {
+                readCounterMap.put(fileName, 1) ;
+            }
+            else {
+                readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
+            }
+            
+            return new Tuple(line, ",");
+        }
+        
+        public static int writeCounter = 0 ;
+        private PrintWriter pw = null ;
+        
+        public void bindTo(OutputStream os) throws IOException {
+            pw = new PrintWriter(os) ;
+        }
+
+        public void finish() throws IOException {
+            pw.close() ;
+        }
+
+        public void putNext(Tuple tuple) throws IOException {
+            writeCounter++ ;
+            pw.println(tuple.toDelimitedString(","));            
+        }
+        
+    }
+    
+    public static class DummyLoadFunc implements LoadFunc {
+
+        public static Map<String,Integer> readCounterMap = null ;
+        
+        protected BufferedPositionedInputStream in = null;
+        private String fileName = null ;
+        
+        public void bindTo(String inputfileName, BufferedPositionedInputStream is,
+                long offset, long end) throws IOException {
+            in = is ;
+            fileName = inputfileName ;
+        }
+
+        public Tuple getNext() throws IOException {
+            String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
+            if (line == null) {
+                return null ;
+            }
+            // else
+            
+            if (readCounterMap == null) {
+                readCounterMap = new HashMap<String,Integer>() ;
+            }
+            
+            if (readCounterMap.get(fileName) == null) {
+                readCounterMap.put(fileName, 1) ;
+            }
+            else {
+                readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
+            }
+            
+            return new Tuple(line, ",");
+        }
+
+    }
+    
+    public static class DummyStoreFunc implements StoreFunc {
+        
+        public static int writeCounter = 0 ;
+        private PrintWriter pw = null ;
+        
+        public void bindTo(OutputStream os) throws IOException {
+            pw = new PrintWriter(os) ;
+        }
+
+        public void finish() throws IOException {
+            pw.close() ;
+        }
+
+        public void putNext(Tuple tuple) throws IOException {
+            writeCounter++ ;
+            pw.println(tuple.toDelimitedString(","));            
+        }
+    }
+
+}