You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/26 21:30:23 UTC

svn commit: r689177 - in /incubator/pig/branches/types: src/org/apache/pig/PigServer.java src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java test/org/apache/pig/test/TestFilterUDF.java test/org/apache/pig/test/TestSplitStore.java

Author: olga
Date: Tue Aug 26 12:30:22 2008
New Revision: 689177

URL: http://svn.apache.org/viewvc?rev=689177&view=rev
Log:
PIG-370: split followed by dump is broken

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Aug 26 12:30:22 2008
@@ -41,6 +41,7 @@
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -307,7 +308,8 @@
             if(null == op) {
                 throw new IOException("Unable to find an operator for alias " + id);
             }
-            ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+//            ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+            ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
             // invocation of "execute" is synchronous!
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
                     return job.getResults();
@@ -328,14 +330,14 @@
      * @throws IOException
      */
 
-    public void store(String id, String filename) throws IOException {
-        store(id, filename, PigStorage.class.getName() + "()");   // SFPig is the default store function
+    public ExecJob store(String id, String filename) throws IOException {
+        return store(id, filename, PigStorage.class.getName() + "()");   // SFPig is the default store function
     }
         
     /**
      *  forces execution of query (and all queries from which it reads), in order to store result in file
      */
-    public void store(
+    public ExecJob store(
             String id,
             String filename,
             String func) throws IOException{
@@ -344,13 +346,13 @@
         
         try {
             LogicalPlan readFrom = getPlanFromAlias(id, "store");
-            store(id, readFrom, filename, func);
+            return store(id, readFrom, filename, func);
         } catch (FrontendException fe) {
             throw WrappedIOException.wrap("Unable to store alias " + id, fe);
         }
     }
         
-    public void store(
+    public ExecJob store(
             String id,
             LogicalPlan readFrom,
             String filename,
@@ -358,7 +360,7 @@
         try {
             LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
                 scope, readFrom, filename, func, aliasOp.get(id), aliases);
-            execute(storePlan);
+            return execute(storePlan);
         } catch (Exception e) {
             throw WrappedIOException.wrap("Unable to store for alias: " +
                 id, e);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Tue Aug 26 12:30:22 2008
@@ -70,6 +70,7 @@
         if (!mIsSchemaComputed) {
             // get our parent's schema
             Collection<LogicalOperator> s = mPlan.getPredecessors(this);
+            if(s==null) return null;
             try {
                 LogicalOperator op = s.iterator().next();
                 if (null == op) {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Tue Aug 26 12:30:22 2008
@@ -24,10 +24,23 @@
 
 public class TestFilterUDF extends TestCase {
     private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    private File tmpFile;
+    
+    public TestFilterUDF() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        int LOOP_SIZE = 20;
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
+    }
     
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.LOCAL);
+        
     }
 
     @After
@@ -53,13 +66,7 @@
     
     @Test
     public void testFilterUDF() throws Exception{
-        int LOOP_SIZE = 20;
-        File tmpFile = File.createTempFile("test", "txt");
-        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
-        for(int i = 1; i <= LOOP_SIZE; i++) {
-            ps.println(i);
-        }
-        ps.close();
+        
         pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:int);");
         pigServer.registerQuery("B = filter A by " + MyFilterFunction.class.getName() + "();");
         Iterator<Tuple> iter = pigServer.openIterator("B");

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java?rev=689177&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java Tue Aug 26 12:30:22 2008
@@ -0,0 +1,117 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestSplitStore extends TestCase{
+    private PigServer pig;
+    private PigContext pigContext;
+    private File tmpFile;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    
+    public TestSplitStore() throws ExecException, IOException{
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigContext = pig.getPigContext();
+        int LOOP_SIZE = 20;
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
+    }
+    @Before
+    public void setUp() throws Exception {
+        
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    @Test
+    public void test1() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+        pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+    }
+    
+    @Test
+    public void test2() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.openIterator("A1");
+        pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+    }
+    
+    @Test
+    public void test3() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.openIterator("A2");
+        pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+    }
+    
+    @Test
+    public void test4() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+        pig.openIterator("A2");
+    }
+    
+    @Test
+    public void test5() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+        pig.openIterator("A1");
+    }
+    
+    @Test
+    public void test6() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.openIterator("A1");
+        pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+    }
+    
+    @Test
+    public void test7() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.openIterator("A2");
+        pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+    }
+    
+    @Test
+    public void test8() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+        pig.openIterator("A2");
+    }
+    
+    @Test
+    public void test9() throws Exception{
+        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+        pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+        pig.openIterator("A1");
+    }
+}