You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/12/18 01:19:22 UTC
svn commit: r1423231 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/newplan/logical/relational/
test/org/apache/pig/newplan/logical/optimizer/ test/org/apache/pig/test/
Author: jcoveney
Date: Tue Dec 18 00:19:20 2012
New Revision: 1423231
URL: http://svn.apache.org/viewvc?rev=1423231&view=rev
Log:
PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
Added:
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/trunk/test/org/apache/pig/test/TestJoin.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Dec 18 00:19:20 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3020: "Duplicate uid in schema" error when joining two relations derived from the same load statement (jcoveney)
+
PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)
PIG-2341: Need better documentation on Pig/HBase integration (jthakrar and billgraham via billgraham)
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Tue Dec 18 00:19:20 2012
@@ -114,13 +114,16 @@ public class Main {
Attributes attr=null;
try {
String findContainingJar = JarManager.findContainingJar(Main.class);
- JarFile jar = new JarFile(findContainingJar);
- final Manifest manifest = jar.getManifest();
- final Map<String,Attributes> attrs = manifest.getEntries();
- attr = attrs.get("org/apache/pig");
+ if (findContainingJar != null) {
+ JarFile jar = new JarFile(findContainingJar);
+ final Manifest manifest = jar.getManifest();
+ final Map<String,Attributes> attrs = manifest.getEntries();
+ attr = attrs.get("org/apache/pig");
+ } else {
+ log.info("Unable to read pigs manifest file as we are not running from a jar, version information unavailable");
+ }
} catch (Exception e) {
- log.warn("Unable to read pigs manifest file, version information unavailable");
- log.warn("Exception: "+e);
+ log.warn("Unable to read pigs manifest file, version information unavailable", e);
}
if (attr!=null) {
version = attr.getValue("Implementation-Version");
@@ -491,8 +494,8 @@ static int run(String args[], PigProgres
if (i != 0) sb.append(' ');
sb.append(remainders[i]);
}
-
- sb.append('\n');
+
+ sb.append('\n');
scriptState.setScript(sb.toString());
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Tue Dec 18 00:19:20 2012
@@ -21,12 +21,17 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+import com.google.common.collect.Sets;
public class LOJoin extends LogicalRelationalOperator {
@@ -43,7 +48,7 @@ public class LOJoin extends LogicalRelat
MERGESPARSE // Sort Merge Index Join
};
-
+
/**
* LOJoin contains a list of logical operators corresponding to the
* relational operators and a list of generates for each relational
@@ -51,21 +56,21 @@ public class LOJoin extends LogicalRelat
* for the columns that are projected
*/
//private static Log log = LogFactory.getLog(LOJoin.class);
- // expression plans for each input.
+ // expression plans for each input.
private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
// indicator for each input whether it is inner
private boolean[] mInnerFlags;
private JOINTYPE mJoinType; // Retains the type of the join
-
- /**
+
+ /**
* static constant to refer to the option of selecting a join type
*/
public final static Integer OPTION_JOIN = 1;
-
+
public LOJoin(LogicalPlan plan) {
- super("LOJoin", plan);
+ super("LOJoin", plan);
}
-
+
public LOJoin(LogicalPlan plan,
MultiMap<Integer, LogicalExpressionPlan> joinPlans,
JOINTYPE jt,
@@ -75,15 +80,15 @@ public class LOJoin extends LogicalRelat
mJoinType = jt;
mInnerFlags = isInner;
}
-
+
public void setJoinPlans(MultiMap<Integer, LogicalExpressionPlan> joinPlans) {
this.mJoinPlans = joinPlans;
}
-
+
public void setInnerFlags(boolean[] isInner) {
this.mInnerFlags = isInner;
}
-
+
public void setJoinType(JOINTYPE jt) {
this.mJoinType = jt;
}
@@ -91,23 +96,23 @@ public class LOJoin extends LogicalRelat
public boolean isInner(int inputIndex) {
return mInnerFlags[inputIndex];
}
-
+
public boolean[] getInnerFlags() {
return mInnerFlags;
}
-
+
public JOINTYPE getJoinType() {
return mJoinType;
}
-
+
public void resetJoinType() {
mJoinType = JOINTYPE.HASH;
}
-
+
public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
return mJoinPlans.get(inputIndex);
}
-
+
/**
* Get all of the expressions plans that are in this join.
* @return collection of all expression plans.
@@ -115,54 +120,81 @@ public class LOJoin extends LogicalRelat
public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
return mJoinPlans;
}
-
+
public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
return mJoinPlans.values();
}
-
+
@Override
public LogicalSchema getSchema() throws FrontendException {
// if schema is calculated before, just return
if (schema != null) {
return schema;
}
-
+
List<Operator> inputs = null;
inputs = plan.getPredecessors(this);
if (inputs == null) {
return null;
}
-
+
List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
-
+
for (Operator op : inputs) {
LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
- // the schema of one input is unknown, so the join schema is unknown, just return
+ // the schema of one input is unknown, so the join schema is unknown, just return
if (inputSchema == null) {
schema = null;
return schema;
}
-
+
for (int i=0; i<inputSchema.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
LogicalSchema.LogicalFieldSchema newFS = null;
- if(fs.alias != null) {
- newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
+ if(fs.alias != null) {
+ newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);
} else {
newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
- }
- fss.add(newFS);
- }
- }
+ }
+ fss.add(newFS);
+ }
+ }
+
+ fixDuplicateUids(fss);
schema = new LogicalSchema();
for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
schema.addField(fieldSchema);
- }
-
+ }
+
return schema;
}
-
+
+ /**
+ * In the case of a join it is possible for multiple columns to have been derived from the same
+ * column and thus have duplicate UID's. This detects that case and resets the uid.
+ * See PIG-3022 and PIG-3093 for more information.
+ * @param fss a list of LogicalFieldSchemas to check the uids of
+ */
+ private void fixDuplicateUids(List<LogicalFieldSchema> fss) {
+ Set<Long> uids = Sets.newHashSet();
+ for (LogicalFieldSchema lfs : fss) {
+ addFieldSchemaUidsToSet(uids, lfs);
+ }
+ }
+
+ private void addFieldSchemaUidsToSet(Set<Long> uids, LogicalFieldSchema lfs) {
+ while (!uids.add(lfs.uid)) {
+ lfs.uid = LogicalExpression.getNextUid();
+ }
+ LogicalSchema ls = lfs.schema;
+ if (ls != null) {
+ for (LogicalFieldSchema lfs2 : ls.getFields()) {
+ addFieldSchemaUidsToSet(uids, lfs2);
+ }
+ }
+ }
+
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
@@ -171,7 +203,7 @@ public class LOJoin extends LogicalRelat
((LogicalRelationalNodesVisitor)v).visit(this);
}
-
+
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof LOJoin) {
@@ -182,12 +214,12 @@ public class LOJoin extends LogicalRelat
if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
}
if (!checkEquality(oj)) return false;
-
+
if (mJoinPlans.size() != oj.mJoinPlans.size()) return false;
-
+
// Now, we need to make sure that for each input we are projecting
// the same columns. This is slightly complicated since MultiMap
- // doesn't return any particular order, so we have to find the
+ // doesn't return any particular order, so we have to find the
// matching input in each case.
for (Integer p : mJoinPlans.keySet()) {
Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
@@ -200,7 +232,7 @@ public class LOJoin extends LogicalRelat
Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
if (c.size() != oc.size()) return false;
-
+
if (!(c instanceof List) || !(oc instanceof List)) {
throw new FrontendException(
"Expected list of expression plans", 2238);
@@ -219,12 +251,12 @@ public class LOJoin extends LogicalRelat
return false;
}
}
-
+
@Override
public String getName() {
return name + "(" + mJoinType.toString() + ")";
}
-
+
public List<Operator> getInputs(LogicalPlan plan) {
return plan.getPredecessors(this);
}
Added: pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java?rev=1423231&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java (added)
+++ pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestSchemaResetter.java Tue Dec 18 00:19:20 2012
@@ -0,0 +1,82 @@
+package org.apache.pig.newplan.logical.optimizer;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * See: https://issues.apache.org/jira/browse/PIG-3020
+ *
+ */
+public class TestSchemaResetter {
+
+ @Test
+ public void testSchemaResetter() throws IOException {
+ new File("build/test/tmp/").mkdirs();
+ Util.createLocalInputFile("build/test/tmp/TestSchemaResetter.pig", new String[] {
+ "A = LOAD 'foo' AS (group:tuple(uid, dst_id));",
+ "edges_both = FOREACH A GENERATE",
+ " group.uid AS src_id,",
+ " group.dst_id AS dst_id;",
+ "both_counts = GROUP edges_both BY src_id;",
+ "both_counts = FOREACH both_counts GENERATE",
+ " group AS src_id, SIZE(edges_both) AS size_both;",
+ "",
+ "edges_bq = FOREACH A GENERATE",
+ " group.uid AS src_id,",
+ " group.dst_id AS dst_id;",
+ "bq_counts = GROUP edges_bq BY src_id;",
+ "bq_counts = FOREACH bq_counts GENERATE",
+ " group AS src_id, SIZE(edges_bq) AS size_bq;",
+ "",
+ "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;",
+ "store per_user_set_sizes into 'foo';"
+ });
+ assertEquals(0, PigRunner.run(new String[] {"-x", "local", "-c", "build/test/tmp/TestSchemaResetter.pig" } , null).getReturnCode());
+ }
+
+ @Test
+ public void testSchemaResetterExec() throws IOException {
+ PigServer pigServer = new PigServer(LOCAL);
+ Data data = Storage.resetData(pigServer);
+ data.set("input",
+ tuple(tuple("1", "2")),
+ tuple(tuple("2", "3")),
+ tuple(tuple("2", "4")));
+ pigServer.registerQuery(
+ "A = LOAD 'input' USING mock.Storage() AS (group:tuple(uid, dst_id));" +
+ "edges_both = FOREACH A GENERATE" +
+ " group.uid AS src_id," +
+ " group.dst_id AS dst_id;" +
+ "both_counts = GROUP edges_both BY src_id;" +
+ "both_counts = FOREACH both_counts GENERATE" +
+ " group AS src_id, SIZE(edges_both) AS size_both;" +
+ "edges_bq = FOREACH A GENERATE" +
+ " group.uid AS src_id," +
+ " group.dst_id AS dst_id;" +
+ "bq_counts = GROUP edges_bq BY src_id;" +
+ "bq_counts = FOREACH bq_counts GENERATE" +
+ " group AS src_id, SIZE(edges_bq) AS size_bq;" +
+ "per_user_set_sizes = JOIN bq_counts BY src_id LEFT OUTER, both_counts BY src_id;" +
+ "store per_user_set_sizes into 'output' USING mock.Storage();");
+ List<Tuple> list = data.get("output");
+ Collections.sort(list);
+ assertEquals("list: "+list, 2, list.size());
+ assertEquals("(1,1,1,1)", list.get(0).toString());
+ assertEquals("(2,2,2,2)", list.get(1).toString());
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=1423231&r1=1423230&r2=1423231&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Dec 18 00:19:20 2012
@@ -18,6 +18,8 @@
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -28,11 +30,13 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -40,6 +44,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
@@ -49,6 +54,8 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
/**
* Test cases to test join statement
*/
@@ -692,5 +699,29 @@ public class TestJoin {
LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
}
-}
+ // See: https://issues.apache.org/jira/browse/PIG-3093
+ @Test
+ public void testIndirectSelfJoinRealias() throws Exception {
+ setUp(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"), tuple("c"));
+ data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
+ pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate *;");
+ pigServer.registerQuery("C = join A by field1, B by field1;");
+ assertEquals(Utils.getSchemaFromString("A::field1:chararray, B::field1:chararray"), pigServer.dumpSchema("C"));
+ pigServer.registerQuery("D = foreach C generate B::field1, A::field1 as field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("D"));
+ pigServer.registerQuery("E = foreach D generate field1, field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray, field2:chararray"), pigServer.dumpSchema("E"));
+ pigServer.registerQuery("F = foreach E generate field2;");
+ Iterator<Tuple> it = pigServer.openIterator("F");
+ assertTrue(it.hasNext());
+ while (it.hasNext()) {
+ assertTrue(tuples.remove(it.next()));
+ }
+ assertFalse(it.hasNext());
+ }
+}
\ No newline at end of file