You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/02/27 22:12:47 UTC
svn commit: r1784664 - in /pig/trunk: ./
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/
src/org/apache/pig/newplan/logical/relational/
src/org/apache/pig/newplan/logical/visitor/ src/org/apache/pig/parser/
test/org/apache/pig/test/
Author: rohini
Date: Mon Feb 27 22:12:47 2017
New Revision: 1784664
URL: http://svn.apache.org/viewvc?rev=1784664&view=rev
Log:
Removing schema alias and :: coming from parent relation (szita via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
pig/trunk/test/org/apache/pig/test/TestSchema.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb 27 22:12:47 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
IMPROVEMENTS
+PIG-5110: Removing schema alias and :: coming from parent relation (szita via rohini)
+
PIG-5085: Support FLATTEN of maps (szita via rohini)
PIG-5126. Add doc about pig in zeppelin (zjffdu)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Mon Feb 27 22:12:47 2017
@@ -5409,7 +5409,9 @@ DUMP X;
<section id="disambiguate">
<title>Disambiguate Operator</title>
-<p>Use the disambiguate operator ( :: ) to identify field names after JOIN, COGROUP, CROSS, or FLATTEN operators.</p>
+<p>After JOIN, COGROUP, CROSS, or FLATTEN operations, the field names have the orginial alias and the disambiguate
+ operator ( :: ) prepended in the schema. The disambiguate operator is used to identify field names in case there
+ is a ambiguity.</p>
<p>In this example, to disambiguate y, use A::y or B::y. In cases where there is no ambiguity, such as z, the :: is not necessary but is still supported.</p>
@@ -5417,8 +5419,14 @@ DUMP X;
A = load 'data1' as (x, y);
B = load 'data2' as (x, y, z);
C = join A by x, B by x;
-D = foreach C generate y; -- which y?
+D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to A::y or B::y
</source>
+<p> In cases where the schema is stored as part of the StoreFunc like PigStorage, JsonStorage, AvroStorage or OrcStorage,
+ users generally have to use an extra FOREACH before STORE to rename the field names and remove the disambiguate
+ operator from the names. To automatically remove the disambiguate operator from the schema for the STORE operation,
+ the <i>pig.store.schema.disambiguate</i> Pig property can be set to "false". It is the responsibility of the user
+ to make sure that there is no conflict in the field names when using this setting.
+</p>
</section>
<!-- =================================================================== -->
@@ -5444,7 +5452,7 @@ D = foreach C generate y; -- which y?
to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
</p>
-
+
<p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated.
(See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
@@ -6537,7 +6545,7 @@ B = FOREACH A GENERATE a, FLATTEN(m);
C = FILTER B by m::value == 5;
……
</source>
-
+
</section>
<section id="nestedblock">
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 27 22:12:47 2017
@@ -501,6 +501,13 @@ public class PigConfiguration {
*/
public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory";
+ /**
+ * If set to false, automatic schema disambiguation gets disabled i.e. group::name will be just name
+ */
+ public static final String PIG_STORE_SCHEMA_DISAMBIGUATE = "pig.store.schema.disambiguate";
+
+ public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
+
// Deprecated settings of Pig 0.13
/**
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon Feb 27 22:12:47 2017
@@ -36,6 +36,7 @@ public class LOStore extends LogicalRela
private boolean isTmpStore;
private SortInfo sortInfo;
private final StoreFuncInterface storeFunc;
+ private boolean disambiguationEnabled = true;
public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) {
super("LOStore", plan);
@@ -43,6 +44,12 @@ public class LOStore extends LogicalRela
this.storeFunc = storeFunc;
this.signature = signature;
}
+
+ public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature,
+ boolean disambiguationEnabled) {
+ this(plan, outputFileSpec, storeFunc, signature);
+ this.disambiguationEnabled = disambiguationEnabled;
+ }
public FileSpec getOutputSpec() {
return output;
@@ -55,6 +62,17 @@ public class LOStore extends LogicalRela
@Override
public LogicalSchema getSchema() throws FrontendException {
schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+
+ if (!disambiguationEnabled && schema != null && schema.getFields() != null) {
+ //If requested try and remove parent alias substring including colon(s)
+ for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) {
+ if (field.alias == null || !field.alias.contains(":")) {
+ continue;
+ }
+ field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1);
+ }
+ }
+
return schema;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 27 22:12:47 2017
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
String sig = LogicalPlanBuilder.newOperatorKey(scope);
stoFunc.setStoreFuncUDFContextSignature(sig);
- store = new LOStore(lp, fileSpec, stoFunc, sig);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
+
+ store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
store.setTmpStore(true);
lp.add( store );
lp.connect( refOp, store );
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 27 22:12:47 2017
@@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder {
fileNameMap.put(fileNameKey, absolutePath);
}
FileSpec fileSpec = new FileSpec(absolutePath, funcSpec);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
- LOStore op = new LOStore(plan, fileSpec, stoFunc, signature);
+ LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled);
return buildOp(loc, op, alias, inputAlias, null);
} catch(Exception ex) {
throw new ParserValidationException(intStream, loc, ex);
Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Mon Feb 27 22:12:47 2017
@@ -70,8 +70,10 @@ public class QueryParserUtils {
fileName = removeQuotes( fileName );
FileSpec fileSpec = new FileSpec( fileName, funcSpec );
String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
stoFunc.setStoreFuncUDFContextSignature(sig);
- LOStore store = new LOStore(lp, fileSpec, stoFunc, sig);
+ LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
store.setAlias(alias);
try {
Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Mon Feb 27 22:12:47 2017
@@ -29,7 +29,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
@@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
import org.apache.pig.parser.ParserException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSchema {
+ private static MiniGenericCluster cluster;
+ private static PigServer pigServer;
+
+ @BeforeClass
+ public static void setupTestCluster() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ }
+
+ @AfterClass
+ public static void tearDownTestCluster() throws Exception {
+ cluster.shutDown();
+ }
+
@Test
public void testSchemaEqual1() {
@@ -660,8 +680,6 @@ public class TestSchema {
@Test
public void testSchemaSerialization() throws IOException {
- MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
String inputFileName = "testSchemaSerialization-input.txt";
String[] inputData = new String[] { "foo\t1", "hello\t2" };
Util.createInputFile(cluster, inputFileName, inputData);
@@ -673,7 +691,6 @@ public class TestSchema {
Tuple t = it.next();
assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
}
- cluster.shutDown();
}
@Test
@@ -938,4 +955,79 @@ public class TestSchema {
assertTrue(schemaString.equals(s2));
}
}
+
+ @Test
+ public void testDisabledDisambiguationContainsNoColons() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ String inputFileName = "testPrepend-input.txt";
+ String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" +
+ "C = GROUP A BY (fruit,color);" +
+ "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" +
+ "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" +
+ "E = JOIN B BY id, D BY group::fruit;" +
+ "F = UNION ONSCHEMA B, D2;" +
+ "G = CROSS B, D2;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+
+ //Prepending should happen with default settings
+ assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString());
+
+ //Override prepend property setting (check for flatten, join)
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString());
+ assertTrue(pigServer.openIterator("E").hasNext());
+
+ //Check for union and cross
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString());
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString());
+
+ }
+
+ @Test
+ public void testEnabledDisambiguationPassesForDupeAliases() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ checkForDupeAliases();
+
+ //Should pass with default settings
+ assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString());
+ assertTrue(pigServer.openIterator("C").hasNext());
+ }
+
+ @Test
+ public void testDisabledDisambiguationFailsForDupeAliases() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ try {
+ checkForDupeAliases();
+ //Should fail with prepending disabled
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+ pigServer.dumpSchema("C");
+ } catch (FrontendException e){
+ Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage());
+ }
+ }
+
+ private static void checkForDupeAliases() throws IOException {
+ String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt";
+ String[] inputData = new String[]{"foo\t1", "bar\t2"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+ "C = JOIN A by id, B by id;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+ }
+
+ private static void resetDisambiguationTestPropertyOverride() {
+ //Reset possible overrides
+ pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE);
+ }
}