You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/31 00:52:08 UTC
svn commit: r1403934 - in /pig/branches/branch-0.11: CHANGES.txt
src/org/apache/pig/impl/util/ObjectSerializer.java
test/org/apache/pig/test/TestMRCompiler.java
test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Author: jcoveney
Date: Tue Oct 30 23:52:07 2012
New Revision: 1403934
URL: http://svn.apache.org/viewvc?rev=1403934&view=rev
Log:
PIG-3017: Pigs object serialization should use compression (jcoveney)
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java
pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java
pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Tue Oct 30 23:52:07 2012
@@ -310,6 +310,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3017: Pig's object serialization should use compression (jcoveney)
+
PIG-2968: ColumnMapKeyPrune fails to prune a subtree inside foreach (knoguchi via cheolsoo)
PIG-2999: Regression after PIG-2975: BinInterSedesTupleRawComparator secondary sort failing (knoguchi via azaroth)
Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/util/ObjectSerializer.java Tue Oct 30 23:52:07 2012
@@ -1,14 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,19 +22,25 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ObjectSerializer {
-
private static final Log log = LogFactory.getLog(ObjectSerializer.class);
-
+
public static String serialize(Serializable obj) throws IOException {
- if (obj == null) return "";
+ if (obj == null)
+ return "";
try {
ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
- ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
+ Deflater def = new Deflater(Deflater.BEST_COMPRESSION);
+ ObjectOutputStream objStream = new ObjectOutputStream(new DeflaterOutputStream(
+ serialObj, def));
objStream.writeObject(obj);
objStream.close();
return encodeBytes(serialObj.toByteArray());
@@ -44,38 +48,24 @@ public class ObjectSerializer {
throw new IOException("Serialization error: " + e.getMessage(), e);
}
}
-
+
public static Object deserialize(String str) throws IOException {
- if (str == null || str.length() == 0) return null;
+ if (str == null || str.length() == 0)
+ return null;
try {
ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str));
- ObjectInputStream objStream = new ObjectInputStream(serialObj);
+ ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj));
return objStream.readObject();
} catch (Exception e) {
throw new IOException("Deserialization error: " + e.getMessage(), e);
}
}
-
+
public static String encodeBytes(byte[] bytes) {
- StringBuffer strBuf = new StringBuffer();
-
- for (int i = 0; i < bytes.length; i++) {
- strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a')));
- strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a')));
- }
-
- return strBuf.toString();
+ return Base64.encodeBase64URLSafeString(bytes);
}
-
+
public static byte[] decodeBytes(String str) {
- byte[] bytes = new byte[str.length() / 2];
- for (int i = 0; i < str.length(); i+=2) {
- char c = str.charAt(i);
- bytes[i/2] = (byte) ((c - 'a') << 4);
- c = str.charAt(i+1);
- bytes[i/2] += (c - 'a');
- }
- return bytes;
+ return Base64.decodeBase64(str);
}
-
}
Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestMRCompiler.java Tue Oct 30 23:52:07 2012
@@ -19,7 +19,6 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
@@ -39,34 +38,45 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.AVG;
-import org.apache.pig.builtin.COUNT;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.SUM;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.GFCross;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.SUM;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.junit.OrderedJUnit4Runner;
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.apache.pig.test.utils.GenPhyOp;
-
-import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -74,14 +84,13 @@ import org.junit.runner.RunWith;
* Test cases to test the MRCompiler.
* VERY IMPORTANT NOTE: The tests here compare results with a
* "golden" set of outputs. In each testcase here, the operators
- * generated have a random operator key which uses Java's Random
+ * generated have a random operator key which uses Java's Random
* class. So if there is a code change which changes the number of
* operators created in a plan, then not only will the "golden" file
* for that test case need to be changed, but also for the tests
* that follow it since the operator keys that will be generated through
* Random will be different.
*/
-
@RunWith(OrderedJUnit4Runner.class)
@TestOrder({
"testRun1",
@@ -101,7 +110,7 @@ import org.junit.runner.RunWith;
"testDistinct1",
"testLimit",
"testMRCompilerErr",
- "testMRCompilerErr1",
+ "testMRCompilerErr1",
"testNumReducersInLimit",
"testNumReducersInLimitWithParallel",
"testUDFInJoin",
@@ -115,27 +124,16 @@ import org.junit.runner.RunWith;
"testSchemaInStoreForDistinctLimit" })
public class TestMRCompiler {
static MiniCluster cluster = MiniCluster.buildCluster();
-
+
static PigContext pc;
static PigContext pcMR;
static final int MAX_SIZE = 100000;
static final long SEED = 1013;
-
- static Random r;
- static{
- pc = new PigContext(ExecType.LOCAL, new Properties());
- pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
- try {
- pc.connect();
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- r = new Random(SEED);
- }
-
+
+ static final Random r = new Random(SEED);
+
PigServer pigServer = null;
PigServer pigServerMR = null;
@@ -146,20 +144,23 @@ public class TestMRCompiler {
// and are sure of
private boolean generate = false;
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ pc = new PigContext(ExecType.LOCAL, new Properties());
+ pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ pc.connect();
+ }
+
@Before
public void setUp() throws ExecException {
GenPhyOp.setR(r);
-
+
GenPhyOp.setPc(pc);
NodeIdGenerator.getGenerator().reset("");
- pigServer = new PigServer( pc );
- pigServerMR = new PigServer( pcMR );
+ pigServer = new PigServer(pc);
+ pigServerMR = new PigServer(pcMR);
}
- @After
- public void tearDown() throws Exception {
- }
-
@Test
public void testRun1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
@@ -447,7 +448,6 @@ public class TestMRCompiler {
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC13.gld");
-
}
@Test
@@ -527,7 +527,6 @@ public class TestMRCompiler {
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC14.gld");
-
}
// Tests Single input case for both blocking and non-blocking
@@ -560,7 +559,6 @@ public class TestMRCompiler {
php.connect(fl, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC1.gld");
-
}
@Test
@@ -581,7 +579,6 @@ public class TestMRCompiler {
php.connect(un, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC2.gld");
-
}
@Test
@@ -774,13 +771,13 @@ public class TestMRCompiler {
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC9.gld");
}
-
+
@Test
public void testSortUDF1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
-
+
// set up order by *
String funcName = WeirdComparator.class.getName();
POUserComparisonFunc comparator = new POUserComparisonFunc(
@@ -794,18 +791,18 @@ public class TestMRCompiler {
topPrj.setOverloaded(true);
topPrj.setResultType(DataType.TUPLE);
nesSortPlan.add(topPrj);
-
+
POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong()));
prjStar2.setResultType(DataType.TUPLE);
prjStar2.setStar(true);
nesSortPlan.add(prjStar2);
-
+
nesSortPlan.connect(topPrj, prjStar2);
List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
nesSortPlanLst.add(nesSortPlan);
-
+
sort.setSortPlans(nesSortPlanLst);
-
+
php.add(sort);
php.connect(ldFil1.getLeaves().get(0), sort);
// have a foreach which takes the sort output
@@ -816,18 +813,18 @@ public class TestMRCompiler {
POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
php.add(fe3);
php.connect(sort, fe3);
-
+
// add a group above the foreach
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(fe3,grpChain1.getRoots().get(0));
-
-
+
+
udfs.clear();
udfs.add(AVG.class.getName());
POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
php.addAsLeaf(fe4);
-
+
PhysicalPlan grpChain2 = GenPhyOp.grpChain();
php.merge(grpChain2);
php.connect(fe4,grpChain2.getRoots().get(0));
@@ -836,36 +833,36 @@ public class TestMRCompiler {
udfs.add(GFCross.class.getName());
POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs);
php.addAsLeaf(fe5);
-
+
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC15.gld");
}
-
+
@Test
public void testDistinct1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
-
+
PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
-1, null);
-
+
php.addAsLeaf(op);
-
+
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(op,grpChain1.getRoots().get(0));
-
+
PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()),
-1, null);
-
+
php.addAsLeaf(op1);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC16.gld");
}
-
+
@Test
public void testLimit() throws Exception {
PhysicalPlan php = new PhysicalPlan();
@@ -883,37 +880,37 @@ public class TestMRCompiler {
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC17.gld");
}
-
- @Test
+
+ @Test(expected = MRCompilerException.class)
public void testMRCompilerErr() throws Exception {
String query = "a = load 'input';" +
"b = filter a by $0 > 5;" +
"store b into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
pp.remove(pp.getRoots().get(0));
try {
Util.buildMRPlan(new PhysicalPlan(), pc);
- fail("Expected failure.");
} catch (MRCompilerException mrce) {
- assertTrue(mrce.getErrorCode() == 2053);
+ assertEquals(2053, mrce.getErrorCode());
+ throw mrce;
}
}
- @Test
- public void testMRCompilerErr1() throws Exception {
+ @Test(expected = MRCompilerException.class)
+ public void testMRCompilerErr1() throws Exception {
PhysicalPlan pp = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
pp.merge(ldFil1);
-
+
POSplit op = GenPhyOp.topSplitOp();
pp.addAsLeaf(op);
try {
Util.buildMRPlan(pp, pc);
- fail("Expected failure.");
} catch (MRCompilerException mrce) {
- assertTrue(mrce.getErrorCode() == 2025);
+ assertEquals(2025, mrce.getErrorCode());
+ throw mrce;
}
}
@@ -929,44 +926,44 @@ public class TestMRCompiler {
"b = order a by $0;" +
"c = limit b 10;" +
"store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
-
+
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
- }
- assertTrue(count == 3);
+ }
+ assertEquals(3, count);
}
-
+
/**
* Test to ensure that the order by with parallel followed by a limit, i.e., top k
* always produces the correct number of map reduce jobs
*/
@Test
public void testNumReducersInLimitWithParallel() throws Exception {
- String query = "a = load 'input';" +
+ String query = "a = load 'input';" +
"b = order a by $0 parallel 2;" +
"c = limit b 10;" + "store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
+
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
-
+
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
- }
- assertTrue(count == 4);
+ }
+ assertEquals(4, count);
}
@Test
@@ -974,13 +971,13 @@ public class TestMRCompiler {
String query = "a = load 'input1' using BinStorage();" +
"b = load 'input2';" +
"c = join a by $0, b by $0;" + "store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
-
- assertTrue(mrOper.UDFs.size()==2);
- assertTrue(mrOper.UDFs.size()==2);
+
+ assertEquals(2, mrOper.UDFs.size());
+ assertEquals(2, mrOper.UDFs.size());
assertTrue(mrOper.UDFs.contains("BinStorage"));
assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage"));
}
@@ -991,47 +988,42 @@ public class TestMRCompiler {
"b = load '/tmp/input2';" +
"c = join a by $0, b by $0 using 'merge';" +
"store c into '/tmp/output1';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
run(pp, "test/org/apache/pig/test/data/GoldenFiles/MRC18.gld");
}
-
- public static class WeirdComparator extends ComparisonFunc {
+ public static class WeirdComparator extends ComparisonFunc {
@Override
public int compare(Tuple t1, Tuple t2) {
- // TODO Auto-generated method stub
int result = 0;
try {
int i1 = (Integer) t1.get(1);
int i2 = (Integer) t2.get(1);
result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
} catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new RuntimeException(e);
}
return result;
}
-
}
-
+
@Test
public void testMergeJoinWithIndexableLoadFunc() throws Exception{
String query = "a = load 'input1';" +
"b = load 'input2' using " +
TestMergeJoin.DummyIndexableLoader.class.getName() + ";" +
"c = join a by $0, b by $0 using 'merge';" + "store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mp = Util.buildMRPlan(pp, pc);
assertEquals("Checking number of MR Jobs for merge join with " +
"IndexableLoadFunc:", 1, mp.size());
-
}
-
+
@Test
public void testCastFuncShipped() throws Exception{
- String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() +
+ String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() +
"('\t') as (a0, a1, a2);" +
"b = group a by a0;" +
"c = foreach b generate flatten(a);" +
@@ -1043,34 +1035,34 @@ public class TestMRCompiler {
MapReduceOper op = mp.getLeaves().get(0);
assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
}
-
+
@Test
public void testLimitAdjusterFuncShipped() throws Exception{
- String query = "a = load 'input';" +
+ String query = "a = load 'input';" +
"b = order a by $0 parallel 2;" +
"c = limit b 7;" + "store c into 'output' using "
+ PigStorageNoDefCtor.class.getName() + "('\t');";
-
+
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
+
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
-
+
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
-
+
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
- }
- assertTrue(count == 4);
+ }
+ assertEquals(4, count);
MapReduceOper op = mrPlan.getLeaves().get(0);
assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
}
-
+
/**
* Test that POSortedDistinct gets printed as POSortedDistinct
* @throws Exception
@@ -1080,7 +1072,7 @@ public class TestMRCompiler {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan grpChain1 = GenPhyOp.loadedGrpChain();
php.merge(grpChain1);
-
+
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
PhysicalPlan inplan = new PhysicalPlan();
PODistinct op1 = new POSortedDistinct(new OperatorKey("", r.nextLong()),
@@ -1089,7 +1081,7 @@ public class TestMRCompiler {
inputs.add(inplan);
List<Boolean> toFlattens = new ArrayList<Boolean>();
toFlattens.add(false);
- POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1,
+ POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1,
inputs, toFlattens);
php.addAsLeaf(pofe);
@@ -1097,7 +1089,7 @@ public class TestMRCompiler {
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC19.gld");
}
-
+
private void run(PhysicalPlan pp, String expectedFile) throws Exception {
String compiledPlan, goldenPlan = null;
int MAX_SIZE = 100000;
@@ -1110,7 +1102,7 @@ public class TestMRCompiler {
ppp.print(baos);
compiledPlan = baos.toString();
- if(generate ){
+ if(generate){
FileOutputStream fos = new FileOutputStream(expectedFile);
fos.write(baos.toByteArray());
return;
@@ -1147,7 +1139,7 @@ public class TestMRCompiler {
public void ensureAllKeyInstancesInSameSplit() throws IOException {
}
}
-
+
public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc {
@Override
public void initialize(Configuration conf) throws IOException {
@@ -1161,56 +1153,56 @@ public class TestMRCompiler {
public void close() throws IOException {
}
}
-
+
@Test
public void testUDFInMergedCoGroup() throws Exception {
String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" +
"b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
"c = cogroup a by $0, b by $0 using 'merge';" +
"store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
-
+
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
mrOper = mrPlan.getSuccessors(mrOper).get(0);
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
}
-
+
@Test
public void testUDFInMergedJoin() throws Exception {
- String query = "a = load 'input1';" +
+ String query = "a = load 'input1';" +
"b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
"c = join a by $0, b by $0 using 'merge';" +
"store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
-
+
assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
}
-
+
//PIG-2146
@Test
public void testSchemaInStoreForDistinctLimit() throws Exception {
//test if the POStore in the 2nd mr plan (that stores the actual output)
- // has a schema
- String query = "a = load 'input1' as (a : int,b :float ,c : int);" +
+ // has a schema
+ String query = "a = load 'input1' as (a : int,b :float ,c : int);" +
"b = distinct a;" +
"c = limit b 10;" +
"store c into 'output';";
-
+
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper secondMrOper = mrPlan.getLeaves().get(0);
POStore store = (POStore)secondMrOper.reducePlan.getLeaves().get(0);
assertEquals(
- "compare load and store schema",
- store.getSchema(),
+ "compare load and store schema",
+ store.getSchema(),
Utils.getSchemaFromString("a : int,b :float ,c : int")
);
}
-}
+}
\ No newline at end of file
Modified: pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=1403934&r1=1403933&r2=1403934&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Tue Oct 30 23:52:07 2012
@@ -18,4 +18,4 @@ Reduce Plan Empty
| | |
| | Project[tuple][*] - scope-111
| |
- | |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaacfkaaangfgogeepggebgmgmejgohahfheemaaafgphagngbhaheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlhihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoimlohdjmoaemmdgiacaaaiemaaakgneghcgpgnefgeghgfhdhbaahoaaademaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaognfdgpggheeghcgpgnefgeghgfhdhbaahoaaademaaamgnfdgpgghefegpefgeghgfhdhbaahoaaademaaaignfegpefgeghgfhdhbaahoaaad
hihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbhggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaagiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaaogjhdfahcgpgkgfgdhefegpefgogefkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghejaaaihdhegbhcheedgpgmemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhed
lhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaaahihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaamfkaaafgbgdgdhfgnfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgbgmgjgbhdhbaahoaaaoemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaabbgphcgjghgjgogbgmemgpgdgbhegjgpgohdhbaahoaaagemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhd
gjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaaaappppppppdchahahahahdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachaaaaaaaaaaaaaaaaahdhbaahoaaaaaaaaaaabhhaeaaaaaaakhdhcaabbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaablhih
dhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaablhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahahi','','b','scope','true')) - scope-102
\ No newline at end of file
+ | |---b: Load(/tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVTtvE0EQnlzixJiQhGeDKBCv7k6iQqKABIgwHNgiaXDF5G5zPti7XXb3wpkCiQYKKKFAAomCkt-AhCiooaRC9NSUMLu248MBUZgtLO_M7jy-_b65t9-hphXsv41b6Bcm5f6yUtgLU23Kh5-PvPiIr6ZhqgkzOr3PSgkAU_dm7C9dui5U4qPEqMt8mSb-BkZ3WB77XYyFkD4rWUQRRc7yJM3pSLen0wh5iD2mfMkx1357YGvTDvprygOvA3soUGtzmfNmLgsTQk3IDKWBYyElDfpJA0oapJnkgS08uFZwk15DebZUcGKsNHvKpfRbkik0QtmMT9_pl1_DD10P6iE0slUlsktxwvRdeADTlDO7ynrawGJo0RkkofghzGUhwy1GvqWKz4JGzpmsJV2IWgiz2Q0hjNvNhrCQrYlNM55m3lnXRdVWz6r7UhLaR__UknuxYeMDAD0PpmwVZHFVuNt7Rw98GXWXfLW5L-8_HLr1aRq8VWhwgfEqRgRME3aZrmK6K3hcynPnXcz5e3X6XbJ_S1dTY4fDMuL4P2EnRCvvfAW8NCagdSQkM7CvDyadT4I1o9I8OVsOu-qawTFKc3MS4hGLqTtN7mFNRMNWW4nbLDKj2mY7sJDqgZkeI4870BBbTFmkGG0OSiUiGyhPVjBpba4XkjPdgQMUnR5kjeycOSO5m1DXBpW5IDixJxK8yHJiz8EKe7Z1Z_m78b87vLTDNmoVKGE4ScKhineGnu9ADaOoyEjRqVXysjE2R9y0ON0tSEIsbqNCzhlPdbYCjT586z3JiBjIU3R8W6CNC2Dg8PgIiNFg4JAm9c26U0PF7eFULSZsXW
HElIFT43cly4firRykOHuFSqlValFEaDsfhpwThalkaEhULDd2nBhoj4cfwBj0YQzGYAx-gzFwIzGojkSqY1rZMXNxosA3HKSWV0f_KdDR27khQno1cPKPQ9deCyq6Jq1aTBYBftI6LWk5kNxyH41GKe0sWpmEa_1eKjNkBeYVM4XK1wyaQhMD-gz6fZy0NqyOqURvOFBgu7j-F62xPSXtBb-ZG5Ywte_b6zc_Hj4-49mPYG0LecEIxqXRuetFtsHUo7fPj-x-9vWJFZNLUJbj4e328F-Hp_M6-ModoDlD_S83YUI3yPIXqjl9HQ','','b_43-1','scope','true')) - scope-102
\ No newline at end of file