You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/12/18 03:39:41 UTC
svn commit: r1423259 - in /pig/branches/branch-0.11: ./ src/org/apache/pig/
src/org/apache/pig/newplan/logical/relational/
test/org/apache/pig/newplan/logical/optimizer/ test/org/apache/pig/test/
Author: jcoveney
Date: Tue Dec 18 02:39:38 2012
New Revision: 1423259
URL: http://svn.apache.org/viewvc?rev=1423259&view=rev
Log:
PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
Added:
pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/
pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/Main.java
pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Dec 18 02:39:38 2012
@@ -332,6 +332,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
+
PIG-3044: hotfix to remove divide by 0 error (jcoveney)
PIG-3033: test-patch failed with javadoc warnings (fang fang chen via cheolsoo)
Modified: pig/branches/branch-0.11/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/Main.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/Main.java Tue Dec 18 02:39:38 2012
@@ -114,13 +114,16 @@ public class Main {
Attributes attr=null;
try {
String findContainingJar = JarManager.findContainingJar(Main.class);
- JarFile jar = new JarFile(findContainingJar);
- final Manifest manifest = jar.getManifest();
- final Map<String,Attributes> attrs = manifest.getEntries();
- attr = attrs.get("org/apache/pig");
+ if (findContainingJar != null) {
+ JarFile jar = new JarFile(findContainingJar);
+ final Manifest manifest = jar.getManifest();
+ final Map<String,Attributes> attrs = manifest.getEntries();
+ attr = attrs.get("org/apache/pig");
+ } else {
+ log.info("Unable to read pigs manifest file as we are not running from a jar, version information unavailable");
+ }
} catch (Exception e) {
- log.warn("Unable to read pigs manifest file, version information unavailable");
- log.warn("Exception: "+e);
+ log.warn("Unable to read pigs manifest file, version information unavailable", e);
}
if (attr!=null) {
version = attr.getValue("Implementation-Version");
@@ -491,8 +494,8 @@ static int run(String args[], PigProgres
if (i != 0) sb.append(' ');
sb.append(remainders[i]);
}
-
- sb.append('\n');
+
+ sb.append('\n');
scriptState.setScript(sb.toString());
Modified: pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LOJoin.java Tue Dec 18 02:39:38 2012
@@ -21,12 +21,17 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+import com.google.common.collect.Sets;
public class LOJoin extends LogicalRelationalOperator {
@@ -43,7 +48,7 @@ public class LOJoin extends LogicalRelat
MERGESPARSE // Sort Merge Index Join
};
-
+
/**
* LOJoin contains a list of logical operators corresponding to the
* relational operators and a list of generates for each relational
@@ -51,21 +56,21 @@ public class LOJoin extends LogicalRelat
* for the columns that are projected
*/
//private static Log log = LogFactory.getLog(LOJoin.class);
- // expression plans for each input.
+ // expression plans for each input.
private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
// indicator for each input whether it is inner
private boolean[] mInnerFlags;
private JOINTYPE mJoinType; // Retains the type of the join
-
- /**
+
+ /**
* static constant to refer to the option of selecting a join type
*/
public final static Integer OPTION_JOIN = 1;
-
+
public LOJoin(LogicalPlan plan) {
- super("LOJoin", plan);
+ super("LOJoin", plan);
}
-
+
public LOJoin(LogicalPlan plan,
MultiMap<Integer, LogicalExpressionPlan> joinPlans,
JOINTYPE jt,
@@ -75,15 +80,15 @@ public class LOJoin extends LogicalRelat
mJoinType = jt;
mInnerFlags = isInner;
}
-
+
public void setJoinPlans(MultiMap<Integer, LogicalExpressionPlan> joinPlans) {
this.mJoinPlans = joinPlans;
}
-
+
public void setInnerFlags(boolean[] isInner) {
this.mInnerFlags = isInner;
}
-
+
public void setJoinType(JOINTYPE jt) {
this.mJoinType = jt;
}
@@ -91,23 +96,23 @@ public class LOJoin extends LogicalRelat
public boolean isInner(int inputIndex) {
return mInnerFlags[inputIndex];
}
-
+
public boolean[] getInnerFlags() {
return mInnerFlags;
}
-
+
public JOINTYPE getJoinType() {
return mJoinType;
}
-
+
public void resetJoinType() {
mJoinType = JOINTYPE.HASH;
}
-
+
public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
return mJoinPlans.get(inputIndex);
}
-
+
/**
* Get all of the expressions plans that are in this join.
* @return collection of all expression plans.
@@ -115,54 +120,81 @@ public class LOJoin extends LogicalRelat
public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
return mJoinPlans;
}
-
+
public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
return mJoinPlans.values();
}
-
+
@Override
public LogicalSchema getSchema() throws FrontendException {
// if schema is calculated before, just return
if (schema != null) {
return schema;
}
-
+
List<Operator> inputs = null;
inputs = plan.getPredecessors(this);
if (inputs == null) {
return null;
}
-
+
List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
-
+
for (Operator op : inputs) {
LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
- // the schema of one input is unknown, so the join schema is unknown, just return
+ // the schema of one input is unknown, so the join schema is unknown, just return
if (inputSchema == null) {
schema = null;
return schema;
}
-
+
for (int i=0; i<inputSchema.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
LogicalSchema.LogicalFieldSchema newFS = null;
- if(fs.alias != null) {
- newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
+ if(fs.alias != null) {
+ newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
} else {
newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
- }
- fss.add(newFS);
- }
- }
+ }
+ fss.add(newFS);
+ }
+ }
+
+ fixDuplicateUids(fss);
schema = new LogicalSchema();
for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
schema.addField(fieldSchema);
- }
-
+ }
+
return schema;
}
-
+
+ /**
+ * In the case of a join it is possible for multiple columns to have been derived from the same
+ * column and thus have duplicate UID's. This detects that case and resets the uid.
+ * See PIG-3022 and PIG-3093 for more information.
+ * @param fss a list of LogicalFieldSchemas to check the uids of
+ */
+ private void fixDuplicateUids(List<LogicalFieldSchema> fss) {
+ Set<Long> uids = Sets.newHashSet();
+ for (LogicalFieldSchema lfs : fss) {
+ addFieldSchemaUidsToSet(uids, lfs);
+ }
+ }
+
+ private void addFieldSchemaUidsToSet(Set<Long> uids, LogicalFieldSchema lfs) {
+ while (!uids.add(lfs.uid)) {
+ lfs.uid = LogicalExpression.getNextUid();
+ }
+ LogicalSchema ls = lfs.schema;
+ if (ls != null) {
+ for (LogicalFieldSchema lfs2 : ls.getFields()) {
+ addFieldSchemaUidsToSet(uids, lfs2);
+ }
+ }
+ }
+
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
@@ -171,7 +203,7 @@ public class LOJoin extends LogicalRelat
((LogicalRelationalNodesVisitor)v).visit(this);
}
-
+
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof LOJoin) {
@@ -182,12 +214,12 @@ public class LOJoin extends LogicalRelat
if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
}
if (!checkEquality(oj)) return false;
-
+
if (mJoinPlans.size() != oj.mJoinPlans.size()) return false;
-
+
// Now, we need to make sure that for each input we are projecting
// the same columns. This is slightly complicated since MultiMap
- // doesn't return any particular order, so we have to find the
+ // doesn't return any particular order, so we have to find the
// matching input in each case.
for (Integer p : mJoinPlans.keySet()) {
Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
@@ -200,7 +232,7 @@ public class LOJoin extends LogicalRelat
Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
if (c.size() != oc.size()) return false;
-
+
if (!(c instanceof List) || !(oc instanceof List)) {
throw new FrontendException(
"Expected list of expression plans", 2238);
@@ -219,12 +251,12 @@ public class LOJoin extends LogicalRelat
return false;
}
}
-
+
@Override
public String getName() {
return name + "(" + mJoinType.toString() + ")";
}
-
+
public List<Operator> getInputs(LogicalPlan plan) {
return plan.getPredecessors(this);
}
Added: pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java?rev=1423259&view=auto
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java (added)
+++ pig/branches/branch-0.11/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java Tue Dec 18 02:39:38 2012
@@ -0,0 +1,82 @@
+package org.apache.pig.newplan.logical.optimizer;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * See: https://issues.apache.org/jira/browse/PIG-3020
+ *
+ */
+public class TestSchemaResetter {
+
+ @Test
+ public void testSchemaResetter() throws IOException {
+ new File("build/test/tmp/").mkdirs();
+ Util.createLocalInputFile("build/test/tmp/TestSchemaResetter.pig", new String[] {
+ "A = LOAD 'foo' AS (group:tuple(uid, dst_id));",
+ "edges_both = FOREACH A GENERATE",
+ " group.uid AS src_id,",
+ " group.dst_id AS dst_id;",
+ "both_counts = GROUP edges_both BY src_id;",
+ "both_counts = FOREACH both_counts GENERATE",
+ " group AS src_id, SIZE(edges_both) AS size_both;",
+ "",
+ "edges_bq = FOREACH A GENERATE",
+ " group.uid AS src_id,",
+ " group.dst_id AS dst_id;",
+ "bq_counts = GROUP edges_bq BY src_id;",
+ "bq_counts = FOREACH bq_counts GENERATE",
+ " group AS src_id, SIZE(edges_bq) AS size_bq;",
+ "",
+ "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;",
+ "store per_user_set_sizes into 'foo';"
+ });
+ assertEquals(0, PigRunner.run(new String[] {"-x", "local", "-c", "build/test/tmp/TestSchemaResetter.pig" } , null).getReturnCode());
+ }
+
+ @Test
+ public void testSchemaResetterExec() throws IOException {
+ PigServer pigServer = new PigServer(LOCAL);
+ Data data = Storage.resetData(pigServer);
+ data.set("input",
+ tuple(tuple("1", "2")),
+ tuple(tuple("2", "3")),
+ tuple(tuple("2", "4")));
+ pigServer.registerQuery(
+ "A = LOAD 'input' USING mock.Storage() AS (group:tuple(uid, dst_id));" +
+ "edges_both = FOREACH A GENERATE" +
+ " group.uid AS src_id," +
+ " group.dst_id AS dst_id;" +
+ "both_counts = GROUP edges_both BY src_id;" +
+ "both_counts = FOREACH both_counts GENERATE" +
+ " group AS src_id, SIZE(edges_both) AS size_both;" +
+ "edges_bq = FOREACH A GENERATE" +
+ " group.uid AS src_id," +
+ " group.dst_id AS dst_id;" +
+ "bq_counts = GROUP edges_bq BY src_id;" +
+ "bq_counts = FOREACH bq_counts GENERATE" +
+ " group AS src_id, SIZE(edges_bq) AS size_bq;" +
+ "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;" +
+ "store per_user_set_sizes into 'output' USING mock.Storage();");
+ List<Tuple> list = data.get("output");
+ Collections.sort(list);
+ assertEquals("list: "+list, 2, list.size());
+ assertEquals("(1,1,1,1)", list.get(0).toString());
+ assertEquals("(2,2,2,2)", list.get(1).toString());
+ }
+}
Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java?rev=1423259&r1=1423258&r2=1423259&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestJoin.java Tue Dec 18 02:39:38 2012
@@ -18,16 +18,21 @@
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -35,6 +40,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -48,31 +54,33 @@ import org.junit.runners.JUnit4;
import junit.framework.TestCase;
+import com.google.common.collect.Sets;
+
/**
* Test cases to test join statement
*/
@RunWith(JUnit4.class)
public class TestJoin extends TestCase {
-
+
static MiniCluster cluster;
private PigServer pigServer;
TupleFactory mTf = TupleFactory.getInstance();
BagFactory mBf = BagFactory.getInstance();
ExecType[] execTypes = new ExecType[] {ExecType.LOCAL, ExecType.MAPREDUCE};
-
+
@Before
@Override
public void setUp() throws Exception{
FileLocalizer.setR(new Random());
}
-
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
private void setUp(ExecType execType) throws ExecException {
// cause a reinitialization of FileLocalizer's
// internal state
@@ -81,10 +89,10 @@ public class TestJoin extends TestCase {
cluster = MiniCluster.buildCluster();
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
} else if(execType == ExecType.LOCAL) {
- pigServer = new PigServer(ExecType.LOCAL);
+ pigServer = new PigServer(ExecType.LOCAL);
}
}
-
+
private String createInputFile(ExecType execType, String fileNameHint, String[] data) throws IOException {
String fileName = "";
if(execType == ExecType.MAPREDUCE) {
@@ -96,7 +104,7 @@ public class TestJoin extends TestCase {
}
return fileName;
}
-
+
private void deleteInputFile(ExecType execType, String fileName) throws IOException {
if(execType == ExecType.MAPREDUCE) {
Util.deleteFile(cluster, fileName);
@@ -126,7 +134,7 @@ public class TestJoin extends TestCase {
"",
""
};
-
+
String firstInput = createInputFile(ExecType.MAPREDUCE, "a.txt", input1);
String secondInput = createInputFile(ExecType.MAPREDUCE, "b.txt", input2);
String script = "a = load 'a.txt' using PigStorage(' ');" +
@@ -138,7 +146,7 @@ public class TestJoin extends TestCase {
deleteInputFile(ExecType.MAPREDUCE, firstInput);
deleteInputFile(ExecType.MAPREDUCE, secondInput);
}
-
+
@Test
public void testJoinUnkownSchema() throws Exception {
// If any of the input schema is unknown, the resulting schema should be unknown as well
@@ -167,11 +175,11 @@ public class TestJoin extends TestCase {
"good\tmorning",
"\tevening"
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
Tuple expectedResult = (Tuple)Util.getPigConstant("('hello',1,'hello','world')");
-
+
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); " +
@@ -181,7 +189,7 @@ public class TestJoin extends TestCase {
assertEquals(true, it.hasNext());
assertEquals(expectedResult, it.next());
assertEquals(false, it.hasNext());
-
+
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) + "'; " +
"b = load '" + Util.encodeEscape(secondInput) + "'; " +
@@ -195,8 +203,8 @@ public class TestJoin extends TestCase {
deleteInputFile(execType, secondInput);
}
}
-
-
+
+
@Test
public void testJoinSchema() throws Exception {
for (ExecType execType : execTypes) {
@@ -210,11 +218,11 @@ public class TestJoin extends TestCase {
"1\thello",
"4\tbye",
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
Tuple expectedResult = (Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
-
+
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (k:int, l:chararray); " +
@@ -225,7 +233,7 @@ public class TestJoin extends TestCase {
assertEquals(true, it.hasNext());
assertEquals(expectedResult, it.next());
assertEquals(false, it.hasNext());
-
+
// schema with duplicates
script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
@@ -241,7 +249,7 @@ public class TestJoin extends TestCase {
exceptionThrown = true;
}
assertEquals(true, exceptionThrown);
-
+
// schema with duplicates with resolution
script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
@@ -256,7 +264,7 @@ public class TestJoin extends TestCase {
deleteInputFile(execType, secondInput);
}
}
-
+
@Test
public void testJoinSchema2() throws Exception {
// test join where one load does not have schema
@@ -271,20 +279,20 @@ public class TestJoin extends TestCase {
"1\thello",
"4\tbye",
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
Tuple expectedResultCharArray =
(Tuple)Util.getPigConstant("('1','2','1','hello','1','2','1','hello')");
-
+
Tuple expectedResult = TupleFactory.getInstance().newTuple();
for(Object field : expectedResultCharArray.getAll()){
expectedResult.append(new DataByteArray(field.toString()));
}
-
+
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' ; " +
- //re-using alias a for new operator below, doing this intentionally
+ //re-using alias a for new operator below, doing this intentionally
// because such use case has been seen
"a = foreach a generate $0 as i, $1 as j ;" +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (k, l); " +
@@ -298,9 +306,9 @@ public class TestJoin extends TestCase {
assertEquals(false, it.hasNext());
deleteInputFile(execType, firstInput);
deleteInputFile(execType, secondInput);
-
+
}
-
+
@Test
public void testLeftOuterJoin() throws Exception {
for (ExecType execType : execTypes) {
@@ -314,18 +322,18 @@ public class TestJoin extends TestCase {
"hello\tworld",
"good\tmorning",
"\tevening"
-
+
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('hello',1,'hello','world')",
"('bye',2,null,null)",
"(null,3,null,null)"
});
-
+
// with and without optional outer
for(int i = 0; i < 2; i++) {
//with schema
@@ -339,7 +347,7 @@ public class TestJoin extends TestCase {
script += "d = order c by $1;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
-
+
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
@@ -349,7 +357,7 @@ public class TestJoin extends TestCase {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
-
+
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; ";
@@ -384,9 +392,9 @@ public class TestJoin extends TestCase {
"hello\tworld",
"good\tmorning",
"\tevening"
-
+
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
@@ -408,7 +416,7 @@ public class TestJoin extends TestCase {
script += "d = order c by $3;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
-
+
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
@@ -418,7 +426,7 @@ public class TestJoin extends TestCase {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
-
+
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
@@ -439,7 +447,7 @@ public class TestJoin extends TestCase {
deleteInputFile(execType, secondInput);
}
}
-
+
@Test
public void testFullOuterJoin() throws Exception {
for (ExecType execType : execTypes) {
@@ -453,9 +461,9 @@ public class TestJoin extends TestCase {
"hello\tworld",
"good\tmorning",
"\tevening"
-
+
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
@@ -479,7 +487,7 @@ public class TestJoin extends TestCase {
script += "d = order c by $1, $3;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
-
+
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
@@ -489,7 +497,7 @@ public class TestJoin extends TestCase {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
-
+
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
@@ -510,7 +518,7 @@ public class TestJoin extends TestCase {
deleteInputFile(execType, secondInput);
}
}
-
+
@Test
public void testMultiOuterJoinFailure() throws ExecException {
setUp(ExecType.LOCAL);
@@ -521,7 +529,7 @@ public class TestJoin extends TestCase {
for (int i = 0; i < types.length; i++) {
boolean errCaught = false;
try {
- String q = query +
+ String q = query +
"d = join a by $0 " + types[i] + " outer, b by $0, c by $0;" +
"store d into 'output';";
Util.buildLp(pigServer, q);
@@ -530,11 +538,11 @@ public class TestJoin extends TestCase {
assertTrue(e.getMessage().contains("mismatched input ',' expecting SEMI_COLON"));
}
assertEquals(true, errCaught);
-
+
}
-
+
}
-
+
@Test
public void testNonRegularOuterJoinFailure() throws ExecException {
setUp(ExecType.LOCAL);
@@ -546,21 +554,21 @@ public class TestJoin extends TestCase {
for(int j = 0; j < joinTypes.length; j++) {
boolean errCaught = false;
try {
- String q = query + "d = join a by $0 " +
+ String q = query + "d = join a by $0 " +
types[i] + " outer, b by $0 using '" + joinTypes[j] +"';" +
"store d into 'output';";
Util.buildLp(pigServer, q);
-
+
} catch(Exception e) {
errCaught = true;
// This after adding support of LeftOuter Join to replicated Join
- assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));
+ assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));
}
assertEquals( i == 0 ? false : true, errCaught);
}
}
}
-
+
@Test
public void testJoinTupleFieldKey() throws Exception{
for (ExecType execType : execTypes) {
@@ -573,24 +581,24 @@ public class TestJoin extends TestCase {
"(1,b)",
"(2,bb)"
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
-
+
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a:tuple(a1:int, a2:chararray));" +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (b:tuple(b1:int, b2:chararray));" +
"c = join a by a.a1, b by b.b1;";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
-
+
assertTrue(it.hasNext());
Tuple t = it.next();
assertTrue(t.toString().equals("((1,a),(1,b))"));
-
+
assertTrue(it.hasNext());
t = it.next();
assertTrue(t.toString().equals("((2,aa),(2,bb))"));
-
+
deleteInputFile(execType, firstInput);
deleteInputFile(execType, secondInput);
}
@@ -608,27 +616,27 @@ public class TestJoin extends TestCase {
"1\t",
"2\taa"
};
-
+
String firstInput = createInputFile(execType, "a.txt", input1);
String secondInput = createInputFile(execType, "b.txt", input2);
-
+
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a1:int, a2:chararray);" +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (b1:int, b2:chararray);" +
"c = join a by (a1, a2), b by (b1, b2);";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
-
+
assertTrue(it.hasNext());
Tuple t = it.next();
assertTrue(t.toString().equals("(2,aa,2,aa)"));
-
+
assertFalse(it.hasNext());
-
+
deleteInputFile(execType, firstInput);
deleteInputFile(execType, secondInput);
}
}
-
+
@Test
public void testLiteralsForJoinAlgoSpecification1() throws Exception {
setUp(ExecType.LOCAL);
@@ -641,7 +649,7 @@ public class TestJoin extends TestCase {
LOJoin join = (LOJoin)lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.MERGE, join.getJoinType());
}
-
+
@Test
public void testLiteralsForJoinAlgoSpecification2() throws Exception {
setUp(ExecType.LOCAL);
@@ -654,12 +662,12 @@ public class TestJoin extends TestCase {
LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.HASH, join.getJoinType());
}
-
+
@Test
public void testLiteralsForJoinAlgoSpecification5() throws Exception {
setUp(ExecType.LOCAL);
String query = "a = load 'A'; " +
- "b = load 'B'; " +
+ "b = load 'B'; " +
"c = Join a by $0, b by $0 using 'default'; "+
"store c into 'output';";
LogicalPlan lp = Util.buildLp(pigServer, query);
@@ -667,7 +675,7 @@ public class TestJoin extends TestCase {
LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.HASH, join.getJoinType());
}
-
+
@Test
public void testLiteralsForJoinAlgoSpecification3() throws Exception {
setUp(ExecType.LOCAL);
@@ -680,11 +688,11 @@ public class TestJoin extends TestCase {
LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
}
-
+
@Test
public void testLiteralsForJoinAlgoSpecification4() throws Exception {
setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
+ String query = "a = load 'A'; " +
"b = load 'B'; " +
"c = Join a by $0, b by $0 using 'replicated'; "+
"store c into 'output';";
@@ -693,4 +701,29 @@ public class TestJoin extends TestCase {
LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
}
+
+ // See: https://issues.apache.org/jira/browse/PIG-3093
+ @Test
+ public void testIndirectSelfJoinRealias() throws Exception {
+ setUp(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"), tuple("c"));
+ data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
+ pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate *;");
+ pigServer.registerQuery("C = join A by field1, B by field1;");
+ assertEquals(Utils.getSchemaFromString("A::field1:chararray, B::field1:chararray"), pigServer.dumpSchema("C"));
+ pigServer.registerQuery("D = foreach C generate B::field1, A::field1 as field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("D"));
+ pigServer.registerQuery("E = foreach D generate field1, field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("E"));
+ pigServer.registerQuery("F = foreach E generate field2;");
+ Iterator<Tuple> it = pigServer.openIterator("F");
+ assertTrue(it.hasNext());
+ while (it.hasNext()) {
+ assertTrue(tuples.remove(it.next()));
+ }
+ assertFalse(it.hasNext());
+ }
}