You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/02/17 18:23:58 UTC
svn commit: r1783444 - in /pig/trunk: ./
src/docs/src/documentation/content/xdocs/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/
Author: rohini
Date: Fri Feb 17 18:23:57 2017
New Revision: 1783444
URL: http://svn.apache.org/viewvc?rev=1783444&view=rev
Log:
PIG-5085: Support FLATTEN of maps (szita via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
pig/trunk/test/org/apache/pig/test/TestPOGenerate.java
pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 17 18:23:57 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
IMPROVEMENTS
+PIG-5085: Support FLATTEN of maps (szita via rohini)
+
PIG-5126. Add doc about pig in zeppelin (zjffdu)
PIG-5120: Let tez_local mode run without a jar file (knoguchi)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Fri Feb 17 18:23:57 2017
@@ -5425,8 +5425,8 @@ D = foreach C generate y; -- which y?
<section id="flatten">
<title>Flatten Operator</title>
<p>The FLATTEN operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples
- and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and
- result is different for each type of structure.</p>
+ and bags in a way that a UDF cannot. Flatten un-nests tuples, bags and maps. The idea is the
+ same, but the operation and result is different for each type of structure.</p>
<p>For tuples, flatten substitutes the fields of a tuple in place of the tuple. For example, consider a relation that has a tuple
of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).</p>
@@ -5436,6 +5436,14 @@ D = foreach C generate y; -- which y?
tuples (b,c) and (d,e). When we remove a level of nesting in a bag, sometimes we cause a cross product to happen.
For example, consider a relation that has a tuple of the form (a, {(b,c), (d,e)}), commonly produced by the GROUP operator.
If we apply the expression GENERATE $0, flatten($1) to this tuple, we will create new tuples: (a, b, c) and (a, d, e).</p>
+
+ <p>For maps, flatten creates a tuple with two fields containing the key and value.
+ If we have a map field named kvpair with input as (m[k1#v1, k2#v2]) and we apply GENERATE flatten(kvpair),
+ it will generate two tuples (k1,v1) and (k2,v2) which can be accessed as kvpair::key and
+ kvpair::value.<br/>When there are additional projections in the expression, a cross product will happen similar
+ to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
+ we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
+ </p>
<p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated.
(See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
@@ -6519,6 +6527,16 @@ D = FOREACH C GENERATE flatten(A), flatt
E = GROUP D BY A::x;
……
</source>
+
+ <p>A FLATTEN example on a map type. Here we load an integer and map (of integer values) into A. Then m gets
+ flattened, and finally we are filtering the result to only include tuples where the value among the un-nested
+ map entries was 5.</p>
+<source>
+A = LOAD 'data' AS (a:int, m:map[int]);
+B = FOREACH A GENERATE a, FLATTEN(m);
+C = FILTER B by m::value == 5;
+……
+</source>
</section>
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 17 18:23:57 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -442,6 +443,8 @@ public class POForEach extends PhysicalO
if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
its[i] = ((DataBag)bags[i]).iterator();
+ } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
+ its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
its[i] = null;
}
@@ -467,7 +470,7 @@ public class POForEach extends PhysicalO
//we instantiate the template array and start populating it with data
data = new Object[noItems];
for(int i = 0; i < noItems; ++i) {
- if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
+ if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
@@ -541,6 +544,15 @@ public class POForEach extends PhysicalO
out.append(t.get(j));
}
}
+ } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry)in;
+ if (knownSize) {
+ out.set(idx++, entry.getKey());
+ out.set(idx++, entry.getValue());
+ } else {
+ out.append(entry.getKey());
+ out.append(entry.getValue());
+ }
} else {
if (knownSize) {
out.set(idx++, in);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Fri Feb 17 18:23:57 2017
@@ -95,17 +95,17 @@ public class LOGenerate extends LogicalR
fieldSchema = exp.getFieldSchema().deepCopy();
expSchema = new LogicalSchema();
- if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
+ if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
// if type is primitive, just add to schema
- if (fieldSchema!=null)
+ if (fieldSchema != null)
expSchema.addField(fieldSchema);
} else {
- // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+ // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
if (fieldSchema.schema==null) {
expSchema = null;
}
else {
- // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
+ // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
if (flattenFlags[i]) {
if (fieldSchema.type == DataType.BAG) {
@@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
+ } else if (fieldSchema.type == DataType.MAP) {
+ //should only contain 1 schemafield for Map's value
+ innerFieldSchemas = fieldSchema.schema.getFields();
+ LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
+ fsForValue.alias = fieldSchema.alias + "::value";
+
+ LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
+ fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
+
+ expSchema.addField(fsForKey);
} else { // DataType.TUPLE
innerFieldSchemas = fieldSchema.schema.getFields();
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
-
+
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
expSchema.addField(fs);
}
Modified: pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Feb 17 18:23:57 2017
@@ -2072,7 +2072,25 @@ public class TestLogicalPlanBuilder {
assertTrue("Sink must end with output", lData.getSinks().get(0).endsWith("output"));
assertEquals("Number of logical relational operators must be 4", lData.getNumLogicalRelationOperators(), 4);
}
-
+
+ @Test
+ public void testFlattenMap() throws Exception {
+ String query = "A = LOAD 'input.txt' as (rowId:int, dataMap:map[int]);" +
+ "B = FOREACH A GENERATE rowId, FLATTEN(dataMap);";
+
+ pigServer.registerQuery(query);
+ Schema schema = pigServer.dumpSchema("B");
+
+ assertEquals(3, schema.size());
+
+ assertEquals(DataType.INTEGER, schema.getField(0).type);
+ assertEquals("rowId", schema.getField(0).alias);
+
+ assertEquals(DataType.CHARARRAY, schema.getField(1).type);
+ assertEquals("dataMap::key", schema.getField(1).alias);
+ assertEquals(DataType.INTEGER, schema.getField(2).type);
+ assertEquals("dataMap::value", schema.getField(2).alias);
+ }
/**
* This method is not generic. Expects logical plan to have atleast
* 1 source and returns the corresponding FuncSpec.
Modified: pig/trunk/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOGenerate.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOGenerate.java Fri Feb 17 18:23:57 2017
@@ -21,8 +21,10 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -46,6 +48,7 @@ public class TestPOGenerate {
DataBag cogroup;
DataBag partialFlatten;
DataBag simpleGenerate;
+ DataBag mapFlatten;
Random r = new Random();
BagFactory bf = BagFactory.getInstance();
TupleFactory tf = TupleFactory.getInstance();
@@ -54,10 +57,25 @@ public class TestPOGenerate {
public void setUp() throws Exception {
Tuple [] inputA = new Tuple[4];
Tuple [] inputB = new Tuple[4];
+ Tuple [] inputC = new Tuple[4];
for(int i = 0; i < 4; i++) {
inputA[i] = tf.newTuple(2);
inputB[i] = tf.newTuple(1);
+ inputC[i] = tf.newTuple(2);
}
+ Map map0 = new HashMap<String,String>();
+ Map map1 = new HashMap<String,String>();
+ Map map2 = new HashMap<String,String>();
+ Map map3 = new HashMap<String,String>();
+ map0.put("A","");
+ map0.put("B","");
+ map1.put("A","a");
+ map1.put("B","b");
+ map2.put("A","aa");
+ map2.put("B","bb");
+ map3.put("A","aaa");
+ map3.put("B","bbb");
+
inputA[0].set(0, 'a');
inputA[0].set(1, '1');
inputA[1].set(0, 'b');
@@ -70,6 +88,15 @@ public class TestPOGenerate {
inputB[1].set(0, 'b');
inputB[2].set(0, 'a');
inputB[3].set(0, 'd');
+ inputC[0].set(0, 0);
+ inputC[0].set(1, map0);
+ inputC[1].set(0, 1);
+ inputC[1].set(1, map1);
+ inputC[2].set(0, 2);
+ inputC[2].set(1, map2);
+ inputC[3].set(0, 3);
+ inputC[3].set(1, map3);
+
DataBag cg11 = bf.newDefaultBag();
cg11.add(inputA[0]);
cg11.add(inputA[2]);
@@ -119,15 +146,22 @@ public class TestPOGenerate {
tPartial[3].append(emptyBag);
partialFlatten = bf.newDefaultBag();
- for(int i = 0; i < 4; ++i) {
+ for (int i = 0; i < 4; ++i) {
partialFlatten.add(tPartial[i]);
}
simpleGenerate = bf.newDefaultBag();
- for(int i = 0; i < 4; ++i) {
+ for (int i = 0; i < 4; ++i) {
simpleGenerate.add(inputA[i]);
}
+
+ mapFlatten = bf.newDefaultBag();
+ for (int i = 0; i < inputC.length; ++i) {
+ mapFlatten.add(inputC[i]);
+ }
+
+
//System.out.println("Cogroup : " + cogroup);
//System.out.println("Partial : " + partialFlatten);
//System.out.println("Simple : " + simpleGenerate);
@@ -248,4 +282,49 @@ public class TestPOGenerate {
assertEquals(simpleGenerate.size(), count);
}
+
+ @Test
+ public void testMapFlattenGenerate() throws Exception {
+ ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ prj1.setResultType(DataType.INTEGER);
+ prj2.setResultType(DataType.MAP);
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(false);
+ toBeFlattened.add(true);
+ PhysicalPlan plan1 = new PhysicalPlan();
+ plan1.add(prj1);
+ PhysicalPlan plan2 = new PhysicalPlan();
+ plan2.add(prj2);
+ List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
+ inputs.add(plan1);
+ inputs.add(plan2);
+ PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
+
+ List<String> obtained = new LinkedList<String>();
+ for (Tuple t : mapFlatten) {
+ poGen.attachInput(t);
+ Result output = poGen.getNextTuple();
+ while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+ //System.out.println(output.result);
+ obtained.add(((Tuple) output.result).toString());
+ output = poGen.getNextTuple();
+ }
+ }
+
+ int count = 0;
+ for (Tuple t : mapFlatten) {
+ Tuple expected = tf.newTuple(3);
+ expected.set(0, t.get(0));
+ for (Object entryObj : ((Map)t.get(1)).entrySet()){
+ Map.Entry entry = ((Map.Entry)entryObj);
+ expected.set(1, entry.getKey());
+ expected.set(2, entry.getValue());
+ assertTrue(obtained.contains(expected.toString()));
+ ++count;
+ }
+ }
+ assertEquals(mapFlatten.size()*2, count);
+
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1783444&r1=1783443&r2=1783444&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Feb 17 18:23:57 2017
@@ -57,6 +57,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
@@ -1473,13 +1474,13 @@ public class TestPruneColumn {
}
@Test
- public void testRelayFlattenMap() throws Exception {
+ public void testFlattenMapCantPruneKeys() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
- + "' as (a0, a1:map[]);");
+ + "' as (a0, a1:map[int]);");
pigServer.registerQuery("B = foreach A generate flatten(a1);");
- pigServer.registerQuery("C = foreach B generate a1#'key1';");
-
+ pigServer.registerQuery("B1 = filter B by a1::key == 'key1';");
+ pigServer.registerQuery("C = foreach B1 generate a1::value;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
@@ -1494,8 +1495,7 @@ public class TestPruneColumn {
assertFalse(iter.hasNext());
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
- "Map key required for A: $1->[key1]"}));
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test