You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/26 21:30:23 UTC
svn commit: r689177 - in /incubator/pig/branches/types:
src/org/apache/pig/PigServer.java
src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
test/org/apache/pig/test/TestFilterUDF.java
test/org/apache/pig/test/TestSplitStore.java
Author: olga
Date: Tue Aug 26 12:30:22 2008
New Revision: 689177
URL: http://svn.apache.org/viewvc?rev=689177&view=rev
Log:
PIG-370: split followed by dump is broken
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Aug 26 12:30:22 2008
@@ -41,6 +41,7 @@
import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -307,7 +308,8 @@
if(null == op) {
throw new IOException("Unable to find an operator for alias " + id);
}
- ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+// ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+ ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
return job.getResults();
@@ -328,14 +330,14 @@
* @throws IOException
*/
- public void store(String id, String filename) throws IOException {
- store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
+ public ExecJob store(String id, String filename) throws IOException {
+ return store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
}
/**
* forces execution of query (and all queries from which it reads), in order to store result in file
*/
- public void store(
+ public ExecJob store(
String id,
String filename,
String func) throws IOException{
@@ -344,13 +346,13 @@
try {
LogicalPlan readFrom = getPlanFromAlias(id, "store");
- store(id, readFrom, filename, func);
+ return store(id, readFrom, filename, func);
} catch (FrontendException fe) {
throw WrappedIOException.wrap("Unable to store alias " + id, fe);
}
}
- public void store(
+ public ExecJob store(
String id,
LogicalPlan readFrom,
String filename,
@@ -358,7 +360,7 @@
try {
LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
scope, readFrom, filename, func, aliasOp.get(id), aliases);
- execute(storePlan);
+ return execute(storePlan);
} catch (Exception e) {
throw WrappedIOException.wrap("Unable to store for alias: " +
id, e);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Tue Aug 26 12:30:22 2008
@@ -70,6 +70,7 @@
if (!mIsSchemaComputed) {
// get our parent's schema
Collection<LogicalOperator> s = mPlan.getPredecessors(this);
+ if(s==null) return null;
try {
LogicalOperator op = s.iterator().next();
if (null == op) {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=689177&r1=689176&r2=689177&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Tue Aug 26 12:30:22 2008
@@ -24,10 +24,23 @@
public class TestFilterUDF extends TestCase {
private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+ private File tmpFile;
+
+ public TestFilterUDF() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ int LOOP_SIZE = 20;
+ tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(i);
+ }
+ ps.close();
+ }
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.LOCAL);
+
}
@After
@@ -53,13 +66,7 @@
@Test
public void testFilterUDF() throws Exception{
- int LOOP_SIZE = 20;
- File tmpFile = File.createTempFile("test", "txt");
- PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- for(int i = 1; i <= LOOP_SIZE; i++) {
- ps.println(i);
- }
- ps.close();
+
pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:int);");
pigServer.registerQuery("B = filter A by " + MyFilterFunction.class.getName() + "();");
Iterator<Tuple> iter = pigServer.openIterator("B");
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java?rev=689177&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestSplitStore.java Tue Aug 26 12:30:22 2008
@@ -0,0 +1,117 @@
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestSplitStore extends TestCase{
+ private PigServer pig;
+ private PigContext pigContext;
+ private File tmpFile;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestSplitStore() throws ExecException, IOException{
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigContext = pig.getPigContext();
+ int LOOP_SIZE = 20;
+ tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ ps.println(i);
+ }
+ ps.close();
+ }
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test1() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ }
+
+ @Test
+ public void test2() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.openIterator("A1");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ }
+
+ @Test
+ public void test3() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.openIterator("A2");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ }
+
+ @Test
+ public void test4() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.openIterator("A2");
+ }
+
+ @Test
+ public void test5() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.openIterator("A1");
+ }
+
+ @Test
+ public void test6() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.openIterator("A1");
+ pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ }
+
+ @Test
+ public void test7() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.openIterator("A2");
+ pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ }
+
+ @Test
+ public void test8() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.openIterator("A2");
+ }
+
+ @Test
+ public void test9() throws Exception{
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
+ pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.openIterator("A1");
+ }
+}