You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/08 19:17:12 UTC
svn commit: r897283 [5/5] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/ contrib/zebra/
contrib/zebra/src/java/org/apache/hadoop/zebra/pig/
contrib/zebra/src/java/org/apache/hadoop/zebra/types/
contrib/zebra/src/test/e2e/merg...
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java Fri Jan 8 18:17:07 2010
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+public class POSplitOutput extends PhysicalOperator {
+
+ /**
+ * POSplitOutput reads from POSplit using an iterator
+ */
+ private static final long serialVersionUID = 1L;
+
+ PhysicalOperator compOp;
+ PhysicalPlan compPlan;
+ transient Iterator<Tuple> it;
+
+ public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public Result getNext(Tuple t) throws ExecException {
+ if(it == null) {
+ PhysicalOperator op = getInputs().get(0);
+ Result res = getInputs().get(0).getNext(t);
+ if(res.returnStatus == POStatus.STATUS_OK)
+ it = (Iterator<Tuple>) res.result;
+ }
+ Result res = null;
+ Result inp = new Result();
+ while(true) {
+ if(it.hasNext())
+ inp.result = it.next();
+ else {
+ inp.returnStatus = POStatus.STATUS_EOP;
+ return inp;
+ }
+ inp.returnStatus = POStatus.STATUS_OK;
+
+ compPlan.attachInput((Tuple) inp.result);
+
+ res = compOp.getNext(dummyBool);
+ if (res.returnStatus != POStatus.STATUS_OK
+ && res.returnStatus != POStatus.STATUS_NULL)
+ return res;
+
+ if (res.result != null && (Boolean) res.result == true) {
+ if(lineageTracer != null) {
+ ExampleTuple tIn = (ExampleTuple) inp.result;
+ lineageTracer.insert(tIn);
+ lineageTracer.union(tIn, tIn);
+ }
+ return inp;
+ }
+ }
+
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "POSplitOutput " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void setPlan(PhysicalPlan compPlan) {
+ this.compPlan = compPlan;
+ this.compOp = compPlan.getLeaves().get(0);
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java Fri Jan 8 18:17:07 2010
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class POStreamLocal extends POStream {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+
+ public POStreamLocal(OperatorKey k, ExecutableManager executableManager,
+ StreamingCommand command, Properties properties) {
+ super(k, executableManager, command, properties);
+ // TODO Auto-generated constructor stub
+ }
+
+
+ /**
+ * This is different from the Map-Reduce implementation of the POStream since there is no
+ * push model here. POStatus_EOP signals the end of input and can be used to decide when
+ * to stop the stdin to the process
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ // The POStream Operator works with ExecutableManager to
+ // send input to the streaming binary and to get output
+ // from it. To achieve a tuple oriented behavior, two queues
+ // are used - one for output from the binary and one for
+ // input to the binary. In each getNext() call:
+ // 1) If there is no more output expected from the binary, an EOP is
+ // sent to successor
+ // 2) If there is any output from the binary in the queue, it is passed
+ // down to the successor
+ // 3) if neither of these two are true and if it is possible to
+ // send input to the binary, then the next tuple from the
+ // predecessor is got and passed to the binary
+ try {
+ // if we are being called AFTER all output from the streaming
+ // binary has already been sent to us then just return EOP
+ // The "allOutputFromBinaryProcessed" flag is set when we see
+ // an EOS (End of Stream output) from streaming binary
+ if(allOutputFromBinaryProcessed) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // if we are here AFTER all map() calls have been completed
+ // AND AFTER we process all possible input to be sent to the
+ // streaming binary, then all we want to do is read output from
+ // the streaming binary
+ if(allInputFromPredecessorConsumed) {
+ Result r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return(r);
+ }
+
+ // if we are here, we haven't consumed all input to be sent
+ // to the streaming binary - check if we are being called
+ // from close() on the map or reduce
+ //if(this.parentPlan.endOfAllInput) {
+ Result r = getNextHelper(t);
+ if(r.returnStatus == POStatus.STATUS_EOP) {
+ // we have now seen *ALL* possible input
+ // check if we ever had any real input
+ // in the course of the map/reduce - if we did
+ // then "initialized" will be true. If not, just
+ // send EOP down.
+ if(initialized) {
+ // signal End of ALL input to the Executable Manager's
+ // Input handler thread
+ binaryInputQueue.put(r);
+ // note this state for future calls
+ allInputFromPredecessorConsumed = true;
+ // look for output from binary
+ r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ }
+
+ } else if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also we are being called
+ // from close() in map or reduce (this is so because
+ // only then this.parentPlan.endOfAllInput is true).
+ // So once we send this EOP down, getNext() in POStream
+ // should never be called. So we don't need to set any
+ // flag noting we saw all output from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return r;
+// } else {
+// // we are not being called from close() - so
+// // we must be called from either map() or reduce()
+// // get the next Result from helper
+// Result r = getNextHelper(t);
+// if(r.returnStatus == POStatus.STATUS_EOS) {
+// // If we received EOS, it means all output
+// // from the streaming binary has been sent to us
+// // So we can send an EOP to the successor in
+// // the pipeline and also note this condition
+// // for future calls
+// r.returnStatus = POStatus.STATUS_EOP;
+// allOutputFromBinaryProcessed = true;
+// }
+// return r;
+// }
+
+ } catch(Exception e) {
+ throw new ExecException("Error while trying to get next result in POStream", e);
+ }
+
+
+ }
+
+
+
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jan 8 18:17:07 2010
@@ -39,11 +39,13 @@
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
+import org.apache.pig.PigCounters;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigStats {
@@ -186,6 +188,9 @@
jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString());
+ jobStats.put("PIG_STATS_SMM_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString() );
+ jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString() );
+
}
else
{
@@ -194,6 +199,8 @@
jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
+ jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1");
+ jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1");
}
} catch (IOException e) {
@@ -294,6 +301,21 @@
}
+ public long getSMMSpillCount() {
+ long spillCount = 0;
+ for (String jid : rootJobIDs) {
+ Map<String, String> jobStats = stats.get(jid);
+ if (jobStats == null) continue;
+ if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L)
+ {
+ spillCount = -1L;
+ break;
+ }
+ spillCount += Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"));
+ }
+ return spillCount;
+ }
+
private long getLocalBytesWritten() {
for(PhysicalOperator op : php.getLeaves())
return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));
Modified: hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml Fri Jan 8 18:17:07 2010
@@ -324,5 +324,9 @@
<Field name = "res" />
<Bug pattern="MF_CLASS_MASKS_FIELD" />
</Match>
-
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher" />
+ <Method name = "launchPig" />
+ <Bug pattern="DE_MIGHT_IGNORE" />
+ </Match>
</FindBugsFilter>
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java Fri Jan 8 18:17:07 2010
@@ -28,8 +28,8 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.LocalSeekableInputStream;
import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.tools.bzip2r.CBZip2InputStream;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java Fri Jan 8 18:17:07 2010
@@ -424,4 +424,39 @@
assertTrue(iter.hasNext()==false);
}
+
+ // See PIG-761
+ @Test
+ public void testLimitPOPackageAnnotator() throws Exception{
+ File tmpFile1 = File.createTempFile("test1", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("2\t5\t2");
+ ps1.close();
+
+ File tmpFile2 = File.createTempFile("test2", "txt");
+ PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+ ps2.println("1\t1");
+ ps2.println("2\t2");
+ ps2.close();
+
+ pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("C = LIMIT B 100;");
+ pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+
+ assertFalse(iter.hasNext());
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java Fri Jan 8 18:17:07 2010
@@ -74,7 +74,7 @@
public void testGetAbsolutePath3() throws IOException {
// test case: remote hdfs path
String absPath = "hdfs://myhost.mydomain:37765/data/passwd";
- Assert.assertEquals(curHdfsRoot + "/data/passwd",
+ Assert.assertEquals(absPath,
LoadFunc.getAbsolutePath(absPath, curHdfsDir));
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java Fri Jan 8 18:17:07 2010
@@ -37,7 +37,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
+import org.apache.pig.pen.LocalLogToPhyTranslationVisitor;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Fri Jan 8 18:17:07 2010
@@ -111,6 +111,63 @@
public void tearDown() throws Exception {
myPig = null;
}
+
+ public void testMultiQueryJiraPig1171() {
+
+ // test case: Problems with some top N queries
+
+ String INPUT_FILE = "abc";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\tapple\t3");
+ w.println("2\torange\t4");
+ w.println("3\tpersimmon\t5");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("A1 = Order A by a desc;");
+ myPig.registerQuery("A2 = limit A1 1;");
+ myPig.registerQuery("B = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("B1 = Order B by a desc;");
+ myPig.registerQuery("B2 = limit B1 1;");
+
+ myPig.registerQuery("C = cross A2, B2;");
+
+ Iterator<Tuple> iter = myPig.openIterator("C");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(3L,'persimmon',5,3L,'persimmon',5)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
public void testMultiQueryJiraPig1157() {
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java Fri Jan 8 18:17:07 2010
@@ -32,7 +32,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.pen.physicalOperators.POCogroup;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java Fri Jan 8 18:17:07 2010
@@ -28,7 +28,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.pen.physicalOperators.POCross;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java Fri Jan 8 18:17:07 2010
@@ -1656,4 +1656,115 @@
"No map keys pruned for C"}));
}
+ // See PIG-1165
+ @Test
+ public void testOrderbyWrongSignature() throws Exception {
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+ pigServer.registerQuery("C = order A by a1;");
+ pigServer.registerQuery("D = join C by a1, B by b0;");
+ pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.size()==3);
+ assertTrue(t.toString().equals("(2,2,2)"));
+
+ assertFalse(iter.hasNext());
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2",
+ "No map keys pruned for A", "No column pruned for B", "No map keys pruned for B"}));
+ }
+
+ // See PIG-1146
+ @Test
+ public void testUnionMixedPruning() throws Exception {
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+ pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
+ pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
+ pigServer.registerQuery("D = union A, C;");
+ pigServer.registerQuery("E = foreach D generate $0, $2;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,3)");
+ results.add("(2,2)");
+ results.add("(1,1)");
+ results.add("(2,2)");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.size()==2);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==2);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==2);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==2);
+ assertTrue(results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+ "No map keys pruned for A", "No column pruned for B",
+ "No map keys pruned for B"}));
+ }
+
+ // See PIG-1176
+ @Test
+ public void testUnionMixedSchemaPruning() throws Exception {
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = foreach A generate a0;;");
+ pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
+ pigServer.registerQuery("D = foreach C generate $0;");
+ pigServer.registerQuery("E = union B, D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ Collection<String> results = new HashSet<String>();
+ results.add("(1)");
+ results.add("(2)");
+ results.add("(1)");
+ results.add("(2)");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.size()==1);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==1);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==1);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.size()==1);
+ assertTrue(results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+
+ assertTrue(emptyLogFileMessage());
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java Fri Jan 8 18:17:07 2010
@@ -977,5 +977,27 @@
}
+ // See PIG-1172
+ @Test
+ public void testForeachJoinRequiredField() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
+ planTester.buildPlan("B = FOREACH A generate flatten($0);");
+ planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
+ planTester.buildPlan("D = JOIN B by a1, C by c1;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Fri Jan 8 18:17:07 2010
@@ -17,12 +17,12 @@
*/
package org.apache.pig.test;
+import java.io.File;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -44,6 +44,12 @@
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.pen.physicalOperators.POCounter;
+import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -58,22 +64,24 @@
import org.junit.Test;
public class TestStore extends junit.framework.TestCase {
-
+ POStore st;
DataBag inpDB;
static MiniCluster cluster = MiniCluster.buildCluster();
- PigServer pig;
PigContext pc;
+ POProject proj;
+ PigServer pig;
+ POCounter pcount;
+
String inputFileName;
String outputFileName;
-
+
@Override
@Before
public void setUp() throws Exception {
pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pc = pig.getPigContext();
inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt";
- outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() +
- ".txt";
+ outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
}
@Override
@@ -87,14 +95,26 @@
private void storeAndCopyLocally(DataBag inpDB) throws Exception {
setUpInputFileOnCluster(inpDB);
String script = "a = load '" + inputFileName + "'; " +
- "store a into '" + outputFileName + "' using PigStorage(':');" +
- "fs -ls /tmp";
+ "store a into '" + outputFileName + "' using PigStorage(':');" +
+ "fs -ls /tmp";
pig.setBatchOn();
Util.registerMultiLineQuery(pig, script);
pig.executeBatch();
Util.copyFromClusterToLocal(cluster, outputFileName + "/part-m-00000", outputFileName);
}
+ private PigStats store() throws Exception {
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(proj);
+ pp.add(st);
+ pp.add(pcount);
+ //pp.connect(proj, st);
+ pp.connect(proj, pcount);
+ pp.connect(pcount, st);
+ pc.setExecType(ExecType.LOCAL);
+ return new MapReduceLauncher().launchPig(pp, "TestStore", pc);
+ }
+
@Test
public void testStore() throws Exception {
inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
@@ -159,7 +179,6 @@
inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
storeAndCopyLocally(inpDB);
PigStorage ps = new PigStorage(":");
-
int size = 0;
BufferedReader br = new BufferedReader(new FileReader(outputFileName));
for(String line=br.readLine();line!=null;line=br.readLine()){
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java Fri Jan 8 18:17:07 2010
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalSeekableInputStream extends SeekableInputStream {
+
+ protected RandomAccessFile file;
+ protected long curMark;
+
+ public LocalSeekableInputStream(File file) throws FileNotFoundException {
+ this.file = new RandomAccessFile(file, "r");
+ this.curMark = 0;
+ }
+
+ @Override
+ public void seek(long offset, FLAGS whence) throws IOException {
+ long targetPos;
+
+ switch (whence) {
+ case SEEK_SET: {
+ targetPos = offset;
+ break;
+ }
+ case SEEK_CUR: {
+ targetPos = this.file.getFilePointer() + offset;
+ break;
+ }
+ case SEEK_END: {
+ targetPos = this.file.length() + offset;
+ break;
+ }
+ default: {
+ throw new IOException("Invalid seek option: " + whence);
+ }
+ }
+
+ this.file.seek(targetPos);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return this.file.getFilePointer();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return this.file.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return this.file.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len ) throws IOException {
+ return this.file.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int)( this.file.length() - this.file.getFilePointer() );
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long skipped = 0;
+
+ if (n > 0) {
+ skipped = this.file.length() - tell();
+
+ seek(n, FLAGS.SEEK_CUR);
+ }
+
+ return skipped;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.file.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ try {
+ this.curMark = tell();
+ }
+ catch (IOException e) {
+ ;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ seek(this.curMark, FLAGS.SEEK_SET);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+}