You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/02/17 18:23:58 UTC

svn commit: r1783444 - in /pig/trunk: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/

Author: rohini
Date: Fri Feb 17 18:23:57 2017
New Revision: 1783444

URL: http://svn.apache.org/viewvc?rev=1783444&view=rev
Log:
PIG-5085: Support FLATTEN of maps (szita via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    pig/trunk/test/org/apache/pig/test/TestPOGenerate.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 17 18:23:57 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-5085: Support FLATTEN of maps (szita via rohini)
+
 PIG-5126. Add doc about pig in zeppelin (zjffdu)
 
 PIG-5120: Let tez_local mode run without a jar file (knoguchi)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Fri Feb 17 18:23:57 2017
@@ -5425,8 +5425,8 @@ D = foreach C generate y; -- which y?
    <section  id="flatten">
    <title>Flatten Operator</title>
    <p>The FLATTEN operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples 
-   and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and 
-   result is different for each type of structure.</p>
+   and bags in a way that a UDF cannot. Flatten un-nests tuples, bags and maps. The idea is the
+      same, but the operation and result is different for each type of structure.</p>
 
    <p>For tuples, flatten substitutes the fields of a tuple in place of the tuple. For example, consider a relation that has a tuple 
    of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).</p>
@@ -5436,6 +5436,14 @@ D = foreach C generate y; -- which y?
    tuples (b,c) and (d,e). When we remove a level of nesting in a bag, sometimes we cause a cross product to happen. 
    For example, consider a relation that has a tuple of the form (a, {(b,c), (d,e)}), commonly produced by the GROUP operator. 
    If we apply the expression GENERATE $0, flatten($1) to this tuple, we will create new tuples: (a, b, c) and (a, d, e).</p>
+
+   <p>For maps, flatten creates a tuple with two fields containing the key and value.
+      If we have a map field named kvpair with input as (m[k1#v1, k2#v2]) and we apply GENERATE flatten(kvpair),
+      it will generate two tuples (k1,v1) and (k2,v2) which can be accessed as kvpair::key and
+      kvpair::value.<br/>When there are additional projections in the expression, a cross product will happen similar
+      to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
+      we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
+   </p>
    
    <p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated. 
    (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
@@ -6519,6 +6527,16 @@ D = FOREACH C GENERATE flatten(A), flatt
 E = GROUP D BY A::x;
 ……
 </source>
+
+   <p>A FLATTEN example on a map type. Here we load an integer and map (of integer values) into A. Then m gets
+      flattened, and finally we are filtering the result to only include tuples where the value among the un-nested
+      map entries was 5.</p>
+<source>
+A = LOAD 'data' AS (a:int, m:map[int]);
+B = FOREACH A GENERATE a, FLATTEN(m);
+C = FILTER B by m::value == 5;
+……
+</source>
    
    </section>
    

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 17 18:23:57 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -442,6 +443,8 @@ public class POForEach extends PhysicalO
 
                 if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
                     its[i] = ((DataBag)bags[i]).iterator();
+                } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
+                    its[i] = ((Map)bags[i]).entrySet().iterator();
                 } else {
                     its[i] = null;
                 }
@@ -467,7 +470,7 @@ public class POForEach extends PhysicalO
                 //we instantiate the template array and start populating it with data
                 data = new Object[noItems];
                 for(int i = 0; i < noItems; ++i) {
-                    if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
+                    if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
@@ -541,6 +544,15 @@ public class POForEach extends PhysicalO
                     out.append(t.get(j));
                 }
                 }
+            } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+                Map.Entry entry = (Map.Entry)in;
+                if (knownSize) {
+                    out.set(idx++, entry.getKey());
+                    out.set(idx++, entry.getValue());
+                } else {
+                    out.append(entry.getKey());
+                    out.append(entry.getValue());
+                }
             } else {
                 if (knownSize) {
                     out.set(idx++, in);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Fri Feb 17 18:23:57 2017
@@ -95,17 +95,17 @@ public class LOGenerate extends LogicalR
                 fieldSchema = exp.getFieldSchema().deepCopy();
                 
                 expSchema = new LogicalSchema();
-                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
+                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
                     // if type is primitive, just add to schema
-                    if (fieldSchema!=null)
+                    if (fieldSchema != null)
                         expSchema.addField(fieldSchema);
                 } else {
-                    // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+                    // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
                     if (fieldSchema.schema==null) {
                         expSchema = null;
                     }
                     else {
-                     // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
+                     // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
                         List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
                         if (flattenFlags[i]) {
                             if (fieldSchema.type == DataType.BAG) {
@@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR
                                         fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                     }
                                 }
+                            } else if (fieldSchema.type == DataType.MAP) {
+                                //should only contain 1 schemafield for Map's value
+                                innerFieldSchemas = fieldSchema.schema.getFields();
+                                LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
+                                fsForValue.alias = fieldSchema.alias + "::value";
+
+                                LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
+                                        fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
+
+                                expSchema.addField(fsForKey);
                             } else { // DataType.TUPLE
                                 innerFieldSchemas = fieldSchema.schema.getFields();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                 }
                             }
-                            
+
                             for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
                                 expSchema.addField(fs);
                         }

Modified: pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Feb 17 18:23:57 2017
@@ -2072,7 +2072,25 @@ public class TestLogicalPlanBuilder {
         assertTrue("Sink must end with output", lData.getSinks().get(0).endsWith("output"));
         assertEquals("Number of logical relational operators must be 4", lData.getNumLogicalRelationOperators(), 4);
     }
-    
+
+    @Test
+    public void testFlattenMap() throws Exception {
+       String query = "A = LOAD 'input.txt' as (rowId:int, dataMap:map[int]);" +
+               "B = FOREACH A GENERATE rowId, FLATTEN(dataMap);";
+
+        pigServer.registerQuery(query);
+        Schema schema = pigServer.dumpSchema("B");
+
+        assertEquals(3, schema.size());
+
+        assertEquals(DataType.INTEGER, schema.getField(0).type);
+        assertEquals("rowId", schema.getField(0).alias);
+
+        assertEquals(DataType.CHARARRAY, schema.getField(1).type);
+        assertEquals("dataMap::key", schema.getField(1).alias);
+        assertEquals(DataType.INTEGER, schema.getField(2).type);
+        assertEquals("dataMap::value", schema.getField(2).alias);
+    }
     /**
      * This method is not generic. Expects logical plan to have atleast
      * 1 source and returns the corresponding FuncSpec.

Modified: pig/trunk/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOGenerate.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOGenerate.java Fri Feb 17 18:23:57 2017
@@ -21,8 +21,10 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -46,6 +48,7 @@ public class TestPOGenerate {
     DataBag cogroup;
     DataBag partialFlatten;
     DataBag simpleGenerate;
+    DataBag mapFlatten;
     Random r = new Random();
     BagFactory bf = BagFactory.getInstance();
     TupleFactory tf = TupleFactory.getInstance();
@@ -54,10 +57,25 @@ public class TestPOGenerate {
     public void setUp() throws Exception {
         Tuple [] inputA = new Tuple[4];
         Tuple [] inputB = new Tuple[4];
+        Tuple [] inputC = new Tuple[4];
         for(int i = 0; i < 4; i++) {
             inputA[i] = tf.newTuple(2);
             inputB[i] = tf.newTuple(1);
+            inputC[i] = tf.newTuple(2);
         }
+        Map map0 = new HashMap<String,String>();
+        Map map1 = new HashMap<String,String>();
+        Map map2 = new HashMap<String,String>();
+        Map map3 = new HashMap<String,String>();
+        map0.put("A","");
+        map0.put("B","");
+        map1.put("A","a");
+        map1.put("B","b");
+        map2.put("A","aa");
+        map2.put("B","bb");
+        map3.put("A","aaa");
+        map3.put("B","bbb");
+
         inputA[0].set(0, 'a');
         inputA[0].set(1, '1');
         inputA[1].set(0, 'b');
@@ -70,6 +88,15 @@ public class TestPOGenerate {
         inputB[1].set(0, 'b');
         inputB[2].set(0, 'a');
         inputB[3].set(0, 'd');
+        inputC[0].set(0, 0);
+        inputC[0].set(1, map0);
+        inputC[1].set(0, 1);
+        inputC[1].set(1, map1);
+        inputC[2].set(0, 2);
+        inputC[2].set(1, map2);
+        inputC[3].set(0, 3);
+        inputC[3].set(1, map3);
+
         DataBag cg11 = bf.newDefaultBag();
         cg11.add(inputA[0]);
         cg11.add(inputA[2]);
@@ -119,15 +146,22 @@ public class TestPOGenerate {
         tPartial[3].append(emptyBag);
 
         partialFlatten = bf.newDefaultBag();
-        for(int i = 0; i < 4; ++i) {
+        for (int i = 0; i < 4; ++i) {
             partialFlatten.add(tPartial[i]);
         }
 
         simpleGenerate = bf.newDefaultBag();
-        for(int i = 0; i < 4; ++i) {
+        for (int i = 0; i < 4; ++i) {
             simpleGenerate.add(inputA[i]);
         }
 
+
+        mapFlatten = bf.newDefaultBag();
+        for (int i = 0; i < inputC.length; ++i) {
+            mapFlatten.add(inputC[i]);
+        }
+
+
         //System.out.println("Cogroup : " + cogroup);
         //System.out.println("Partial : " + partialFlatten);
         //System.out.println("Simple : " + simpleGenerate);
@@ -248,4 +282,49 @@ public class TestPOGenerate {
         assertEquals(simpleGenerate.size(), count);
 
     }
+
+    @Test
+    public void testMapFlattenGenerate() throws Exception {
+        ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        prj1.setResultType(DataType.INTEGER);
+        prj2.setResultType(DataType.MAP);
+        List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+        toBeFlattened.add(false);
+        toBeFlattened.add(true);
+        PhysicalPlan plan1 = new PhysicalPlan();
+        plan1.add(prj1);
+        PhysicalPlan plan2 = new PhysicalPlan();
+        plan2.add(prj2);
+        List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
+        inputs.add(plan1);
+        inputs.add(plan2);
+        PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
+
+        List<String> obtained = new LinkedList<String>();
+        for (Tuple t : mapFlatten) {
+            poGen.attachInput(t);
+            Result output = poGen.getNextTuple();
+            while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+                //System.out.println(output.result);
+                obtained.add(((Tuple) output.result).toString());
+                output = poGen.getNextTuple();
+            }
+        }
+
+        int count = 0;
+        for (Tuple t : mapFlatten) {
+            Tuple expected = tf.newTuple(3);
+            expected.set(0, t.get(0));
+            for (Object entryObj : ((Map)t.get(1)).entrySet()){
+                Map.Entry entry = ((Map.Entry)entryObj);
+                expected.set(1, entry.getKey());
+                expected.set(2, entry.getValue());
+                assertTrue(obtained.contains(expected.toString()));
+                ++count;
+            }
+        }
+        assertEquals(mapFlatten.size()*2, count);
+
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Feb 17 18:23:57 2017
@@ -57,6 +57,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
@@ -1473,13 +1474,13 @@ public class TestPruneColumn {
     }
 
     @Test
-    public void testRelayFlattenMap() throws Exception {
+    public void testFlattenMapCantPruneKeys() throws Exception {
         pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
-                + "' as (a0, a1:map[]);");
+                + "' as (a0, a1:map[int]);");
 
         pigServer.registerQuery("B = foreach A generate flatten(a1);");
-        pigServer.registerQuery("C = foreach B generate a1#'key1';");
-
+        pigServer.registerQuery("B1 = filter B by a1::key == 'key1';");
+        pigServer.registerQuery("C = foreach B1 generate a1::value;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
         assertTrue(iter.hasNext());
@@ -1494,8 +1495,7 @@ public class TestPruneColumn {
 
         assertFalse(iter.hasNext());
 
-        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
-                "Map key required for A: $1->[key1]"}));
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
     }
 
     @Test