You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/02/27 22:12:47 UTC

svn commit: r1784664 - in /pig/trunk: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/newplan/logical/visitor/ src/org/apache/pig/parser/ test/org/apache/pig/test/

Author: rohini
Date: Mon Feb 27 22:12:47 2017
New Revision: 1784664

URL: http://svn.apache.org/viewvc?rev=1784664&view=rev
Log:
Removing schema alias and :: coming from parent relation (szita via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
    pig/trunk/test/org/apache/pig/test/TestSchema.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb 27 22:12:47 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-5110: Removing schema alias and :: coming from parent relation (szita via rohini)
+
 PIG-5085: Support FLATTEN of maps (szita via rohini)
 
 PIG-5126. Add doc about pig in zeppelin (zjffdu)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Mon Feb 27 22:12:47 2017
@@ -5409,7 +5409,9 @@ DUMP X;
 <section id="disambiguate">
 <title>Disambiguate Operator</title>
 
-<p>Use the disambiguate operator ( :: ) to identify field names after JOIN, COGROUP, CROSS, or FLATTEN operators.</p>
+<p>After JOIN, COGROUP, CROSS, or FLATTEN operations, the field names have the orginial alias and the disambiguate
+   operator ( :: ) prepended in the schema. The disambiguate operator is used to identify field names in case there
+   is a ambiguity.</p>
 
 <p>In this example, to disambiguate y,  use A::y or B::y.  In cases where there is no ambiguity, such as z, the :: is not necessary but is still supported.</p>
 
@@ -5417,8 +5419,14 @@ DUMP X;
 A = load 'data1' as (x, y);
 B = load 'data2' as (x, y, z);
 C = join A by x, B by x;
-D = foreach C generate y; -- which y?
+D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to A::y or B::y
 </source>
+<p> In cases where the schema is stored as part of the StoreFunc like PigStorage, JsonStorage, AvroStorage or OrcStorage,
+   users generally have to use an extra FOREACH before STORE to rename the field names and remove the disambiguate
+   operator from the names. To automatically remove the disambiguate operator from the schema for the STORE operation,
+   the <i>pig.store.schema.disambiguate</i> Pig property can be set to "false". It is the responsibility of the user
+   to make sure that there is no conflict in the field names when using this setting.
+</p>
 </section>
 
     <!-- =================================================================== -->  
@@ -5444,7 +5452,7 @@ D = foreach C generate y; -- which y?
       to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
       we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
    </p>
-   
+
    <p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated. 
    (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
    
@@ -6537,7 +6545,7 @@ B = FOREACH A GENERATE a, FLATTEN(m);
 C = FILTER B by m::value == 5;
 ……
 </source>
-   
+
    </section>
    
    <section id="nestedblock">

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 27 22:12:47 2017
@@ -501,6 +501,13 @@ public class PigConfiguration {
      */
     public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory";
 
+    /**
+     * If set to false, automatic schema disambiguation gets disabled i.e. group::name will be just name
+     */
+    public static final String PIG_STORE_SCHEMA_DISAMBIGUATE = "pig.store.schema.disambiguate";
+
+    public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon Feb 27 22:12:47 2017
@@ -36,6 +36,7 @@ public class LOStore extends LogicalRela
     private boolean isTmpStore;
     private SortInfo sortInfo;
     private final StoreFuncInterface storeFunc;
+    private boolean disambiguationEnabled = true;
 
     public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) {
         super("LOStore", plan);
@@ -43,6 +44,12 @@ public class LOStore extends LogicalRela
         this.storeFunc = storeFunc;
         this.signature = signature;
     }
+
+    public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature,
+                   boolean disambiguationEnabled) {
+        this(plan, outputFileSpec, storeFunc, signature);
+        this.disambiguationEnabled = disambiguationEnabled;
+    }
     
     public FileSpec getOutputSpec() {
         return output;
@@ -55,6 +62,17 @@ public class LOStore extends LogicalRela
     @Override
     public LogicalSchema getSchema() throws FrontendException {
         schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+
+        if (!disambiguationEnabled && schema != null && schema.getFields() != null) {
+            //If requested try and remove parent alias substring including colon(s)
+            for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) {
+                if (field.alias == null || !field.alias.contains(":")) {
+                    continue;
+                }
+                field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1);
+            }
+        }
+
         return schema;
     }
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 27 22:12:47 2017
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx
                     StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
                     String sig = LogicalPlanBuilder.newOperatorKey(scope);
                     stoFunc.setStoreFuncUDFContextSignature(sig);
-                    store = new LOStore(lp, fileSpec, stoFunc, sig);
+                    boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                            getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
+
+                    store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
                     store.setTmpStore(true);
                     lp.add( store );
                     lp.connect( refOp, store );

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 27 22:12:47 2017
@@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder {
                 fileNameMap.put(fileNameKey, absolutePath);
             }
             FileSpec fileSpec = new FileSpec(absolutePath, funcSpec);
+            boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                    getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
 
-            LOStore op = new LOStore(plan, fileSpec, stoFunc, signature);
+            LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled);
             return buildOp(loc, op, alias, inputAlias, null);
         } catch(Exception ex) {
             throw new ParserValidationException(intStream, loc, ex);

Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Mon Feb 27 22:12:47 2017
@@ -70,8 +70,10 @@ public class QueryParserUtils {
         fileName = removeQuotes( fileName );
         FileSpec fileSpec = new FileSpec( fileName, funcSpec );
         String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
+        boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
         stoFunc.setStoreFuncUDFContextSignature(sig);
-        LOStore store = new LOStore(lp, fileSpec, stoFunc, sig);
+        LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
         store.setAlias(alias);
 
         try {

Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Mon Feb 27 22:12:47 2017
@@ -29,7 +29,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.data.DataType;
@@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
 import org.apache.pig.parser.ParserException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSchema {
 
+    private static MiniGenericCluster cluster;
+    private static PigServer pigServer;
+
+    @BeforeClass
+    public static void setupTestCluster() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    }
+
+    @AfterClass
+    public static void tearDownTestCluster() throws Exception {
+        cluster.shutDown();
+    }
+
     @Test
     public void testSchemaEqual1() {
 
@@ -660,8 +680,6 @@ public class TestSchema {
 
     @Test
     public void testSchemaSerialization() throws IOException {
-        MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         String inputFileName = "testSchemaSerialization-input.txt";
         String[] inputData = new String[] { "foo\t1", "hello\t2" };
         Util.createInputFile(cluster, inputFileName, inputData);
@@ -673,7 +691,6 @@ public class TestSchema {
             Tuple t = it.next();
             assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
         }
-        cluster.shutDown();
     }
 
     @Test
@@ -938,4 +955,79 @@ public class TestSchema {
             assertTrue(schemaString.equals(s2));
         }
     }
+
+    @Test
+    public void testDisabledDisambiguationContainsNoColons() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        String inputFileName = "testPrepend-input.txt";
+        String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"};
+        Util.createInputFile(cluster, inputFileName, inputData);
+
+        String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" +
+                "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" +
+                "C = GROUP A BY (fruit,color);" +
+                "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" +
+                "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" +
+                "E = JOIN B BY id, D BY group::fruit;" +
+                "F = UNION ONSCHEMA B, D2;" +
+                "G = CROSS B, D2;";
+
+        Util.registerMultiLineQuery(pigServer, script);
+
+        //Prepending should happen with default settings
+        assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString());
+
+        //Override prepend property setting (check for flatten, join)
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString());
+        assertTrue(pigServer.openIterator("E").hasNext());
+
+        //Check for union and cross
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString());
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString());
+
+    }
+
+    @Test
+    public void testEnabledDisambiguationPassesForDupeAliases() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        checkForDupeAliases();
+
+        //Should pass with default settings
+        assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString());
+        assertTrue(pigServer.openIterator("C").hasNext());
+    }
+
+    @Test
+    public void testDisabledDisambiguationFailsForDupeAliases() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        try {
+            checkForDupeAliases();
+            //Should fail with prepending disabled
+            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+            pigServer.dumpSchema("C");
+        } catch (FrontendException e){
+            Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage());
+        }
+    }
+
+    private static void checkForDupeAliases() throws IOException {
+        String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt";
+        String[] inputData = new String[]{"foo\t1", "bar\t2"};
+        Util.createInputFile(cluster, inputFileName, inputData);
+
+        String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+                "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+                "C = JOIN A by id, B by id;";
+
+        Util.registerMultiLineQuery(pigServer, script);
+    }
+
+    private static void resetDisambiguationTestPropertyOverride() {
+        //Reset possible overrides
+        pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE);
+    }
 }