You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/03/04 19:15:14 UTC
svn commit: r1078085 [11/12] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ s...
Added: pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java?rev=1078085&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java Fri Mar 4 18:15:11 2011
@@ -0,0 +1,5239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genDummyLOLoadNewLP;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genFlatSchema;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genFlatSchemaInTuple;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printCurrentMethodName;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinCondExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NegativeExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
+import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
+import org.apache.pig.newplan.logical.visitor.TypeCheckingExpVisitor;
+import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
+import org.apache.pig.parser.ParserTestingUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTypeCheckingValidatorNewLP {
+
+ PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+ LogicalPlanTester planTester;
+ private static final String CAST_LOAD_NOT_FOUND =
+ "Cannot resolve load function to use for casting from bytearray";
+
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Before
+ public void setUp() throws Exception {
+ // create a new instance of the plan tester
+ // for each test so that different tests do not
+ // interact with each other's plans
+ planTester = new LogicalPlanTester(pc) ;
+ }
+
+ private static final String simpleEchoStreamingCommand;
+ static {
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+ simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ else
+ simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ File fileA = new File("a");
+ File fileB = new File("b");
+ try {
+ fileA.delete();
+ fileB.delete();
+ if(!fileA.createNewFile() || !fileB.createNewFile())
+ fail("Unable to create input files");
+ } catch (IOException e) {
+ fail("Unable to create input files:" + e.getMessage());
+ }
+ fileA.deleteOnExit();
+ fileB.deleteOnExit();
+ }
+
+ @Test
+ public void testExpressionTypeChecking1() throws Throwable {
+ LogicalExpressionPlan expPlan = new LogicalExpressionPlan();
+
+ ConstantExpression constant1 = new ConstantExpression(expPlan, 10);
+
+ ConstantExpression constant2 = new ConstantExpression(expPlan, 20D) ;
+ ConstantExpression constant3 = new ConstantExpression(expPlan, 123f) ;
+
+ AddExpression add1 = new AddExpression(expPlan, constant1, constant2) ;
+ CastExpression cast1 = new CastExpression(expPlan,constant3, createFS(DataType.DOUBLE)) ;
+ MultiplyExpression mul1 = new MultiplyExpression(expPlan, add1, cast1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(expPlan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // Induction check
+ assertEquals(DataType.DOUBLE, add1.getType()) ;
+ assertEquals(DataType.DOUBLE, mul1.getType()) ;
+
+ // Cast insertion check
+ assertEquals(DataType.DOUBLE, add1.getLhs().getType()) ;
+ assertEquals(DataType.DOUBLE, mul1.getRhs().getType()) ;
+
+ }
+
+ private LogicalFieldSchema createFS(byte datatype) {
+ return new LogicalFieldSchema(null, null, datatype);
+ }
+
+ @Test
+ public void testExpressionTypeCheckingFail1() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20D) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, "123") ;
+
+ AddExpression add1 = new AddExpression(plan, constant1, constant2) ;
+ CastExpression cast1 = new CastExpression(plan, constant3, createFS(DataType.BYTEARRAY)) ;
+ MultiplyExpression mul1 = new MultiplyExpression(plan, add1, cast1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ try {
+ expTypeChecker.visit();
+ fail("Exception expected") ;
+ }
+ catch (TypeCheckerException pve) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+ }
+
+ @Test
+ public void testExpressionTypeChecking2() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, new DataByteArray()) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, 123L) ;
+ ConstantExpression constant4 = new ConstantExpression(plan, true) ;
+
+ SubtractExpression sub1 = new SubtractExpression(plan, constant1, constant2) ;
+ GreaterThanExpression gt1 = new GreaterThanExpression(plan, sub1, constant3) ;
+ AndExpression and1 = new AndExpression(plan, gt1, constant4) ;
+ NotExpression not1 = new NotExpression(plan, and1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+
+
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error not expected during type checking") ;
+ }
+
+
+ // Induction check
+ assertEquals(DataType.INTEGER, sub1.getType()) ;
+ assertEquals(DataType.BOOLEAN, gt1.getType()) ;
+ assertEquals(DataType.BOOLEAN, and1.getType()) ;
+ assertEquals(DataType.BOOLEAN, not1.getType()) ;
+
+ // Cast insertion check
+ assertEquals(DataType.INTEGER, sub1.getRhs().getType()) ;
+ assertEquals(DataType.LONG, gt1.getLhs().getType()) ;
+
+ }
+
+
+ @Test
+ public void testExpressionTypeChecking3() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20L) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, 123) ;
+
+ ModExpression mod1 = new ModExpression(plan, constant1, constant2) ;
+ EqualExpression equal1 = new EqualExpression(plan, mod1, constant3) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+
+ plan.explain(System.out, "text", true);
+
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // Induction check
+ assertEquals(DataType.LONG, mod1.getType()) ;
+ assertEquals(DataType.BOOLEAN, equal1.getType()) ;
+
+ // Cast insertion check
+ assertEquals(DataType.LONG, mod1.getLhs().getType()) ;
+ assertEquals(DataType.LONG, equal1.getRhs().getType()) ;
+
+ }
+
+ @Test
+ public void testExpressionTypeChecking4() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20D) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, 123f) ;
+
+ DivideExpression div1 = new DivideExpression(plan, constant1, constant2) ;
+ CastExpression cast1 = new CastExpression(plan, constant3, createFS(DataType.DOUBLE));
+ NotEqualExpression notequal1 = new NotEqualExpression(plan, div1, cast1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // Induction check
+ assertEquals(DataType.DOUBLE, div1.getType()) ;
+ assertEquals(DataType.BOOLEAN, notequal1.getType()) ;
+
+ // Cast insertion check
+ assertEquals(DataType.DOUBLE, div1.getLhs().getType()) ;
+ assertEquals(DataType.DOUBLE, notequal1.getRhs().getType()) ;
+
+ }
+
+ @Test
+ public void testExpressionTypeCheckingFail4() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20D) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, "123") ;
+
+ DivideExpression div1 = new DivideExpression(plan, constant1, constant2) ;
+ CastExpression cast1 = new CastExpression(plan, constant3, createFS(DataType.BYTEARRAY)) ;
+ NotEqualExpression notequal1 = new NotEqualExpression(plan, div1, cast1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+
+ try{
+ expTypeChecker.visit();
+ fail("Exception expected") ;
+ }
+ catch (TypeCheckerException pve) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+ }
+
+ @Test
+ public void testExpressionTypeChecking5() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10F) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20L) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, 123F) ;
+ ConstantExpression constant4 = new ConstantExpression(plan, 123D) ;
+
+ LessThanEqualExpression lesser1 = new LessThanEqualExpression(plan, constant1, constant2) ;
+ BinCondExpression bincond1 = new BinCondExpression(plan, lesser1, constant3, constant4) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // Induction check
+ assertEquals(DataType.BOOLEAN, lesser1.getType()) ;
+ assertEquals(DataType.DOUBLE, bincond1.getType()) ;
+
+ // Cast insertion check
+ assertEquals(DataType.FLOAT, lesser1.getLhs().getType()) ;
+ assertEquals(DataType.FLOAT, lesser1.getRhs().getType()) ;
+ assertEquals(DataType.DOUBLE, bincond1.getLhs().getType()) ;
+ assertEquals(DataType.DOUBLE, bincond1.getRhs().getType()) ;
+
+ }
+
+ @Test
+ public void testExpressionTypeChecking6() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, "10") ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20L) ;
+
+ AddExpression add1 = new AddExpression(plan, constant1, constant2) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+ try {
+ expTypeChecker.visit();
+ fail("Exception expected") ;
+ }
+ catch (TypeCheckerException pve) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Error expected") ;
+ }
+
+ }
+
+ /**
+ * @return a dummy logical relational operator
+ */
+ private LogicalRelationalOperator createDummyRelOpWithAlias() {
+ class DummyRelOp extends LogicalRelationalOperator{
+ DummyRelOp(){
+ super("dummy", new LogicalPlan());
+ this.alias = "dummy";
+ }
+
+ @Override
+ public LogicalSchema getSchema() throws FrontendException {
+ return null;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+
+ }
+
+ @Override
+ public boolean isEqual(Operator operator) throws FrontendException {
+ return false;
+ }
+
+ }
+ return new DummyRelOp();
+ }
+
+
+
+
+ @Test
+ public void testExpressionTypeChecking7() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20D) ;
+ ConstantExpression constant3 = new ConstantExpression(plan, 123L) ;
+
+ GreaterThanExpression gt1 = new GreaterThanExpression(plan, constant1, constant2) ;
+ EqualExpression equal1 = new EqualExpression(plan, gt1, constant3) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+ try {
+ expTypeChecker.visit();
+ fail("Exception expected") ;
+ }
+ catch (TypeCheckerException pve) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Error expected") ;
+ }
+ }
+
+ @Test
+ public void testExpressionTypeChecking8() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ ArrayList<Object> innerObjList = new ArrayList<Object>();
+ ArrayList<Object> objList = new ArrayList<Object>();
+
+ innerObjList.add(10);
+ innerObjList.add(3);
+ innerObjList.add(7);
+ innerObjList.add(17);
+
+ Tuple innerTuple = tupleFactory.newTuple(innerObjList);
+
+ objList.add("World");
+ objList.add(42);
+ objList.add(innerTuple);
+
+ Tuple tuple = tupleFactory.newTuple(objList);
+
+ ArrayList<Schema.FieldSchema> innerFss = new ArrayList<Schema.FieldSchema>();
+ ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+ ArrayList<Schema.FieldSchema> castFss = new ArrayList<Schema.FieldSchema>();
+
+ Schema.FieldSchema stringFs = new Schema.FieldSchema(null, DataType.CHARARRAY);
+ Schema.FieldSchema intFs = new Schema.FieldSchema(null, DataType.INTEGER);
+
+ for(int i = 0; i < innerObjList.size(); ++i) {
+ innerFss.add(intFs);
+ }
+
+ Schema innerTupleSchema = new Schema(innerFss);
+
+ fss.add(stringFs);
+ fss.add(intFs);
+ fss.add(new Schema.FieldSchema(null, innerTupleSchema, DataType.TUPLE));
+
+ Schema tupleSchema = new Schema(fss);
+
+ Schema.FieldSchema byteArrayFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ for(int i = 0; i < 4; ++i) {
+ castFss.add(byteArrayFs);
+ }
+
+ Schema castSchema = new Schema(castFss);
+
+
+ ConstantExpression constant1 = new ConstantExpression(plan, innerTuple) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, tuple) ;
+ CastExpression cast1 = new CastExpression(plan, constant1,
+ Util.translateFieldSchema(new FieldSchema(null, castSchema, DataType.TUPLE))) ;
+
+ EqualExpression equal1 = new EqualExpression(plan, cast1, constant2) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+
+ LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ assertEquals(DataType.BOOLEAN, equal1.getType()) ;
+ assertEquals(DataType.TUPLE, equal1.getRhs().getType()) ;
+ assertEquals(DataType.TUPLE, equal1.getLhs().getType()) ;
+ }
+
+ /*
+ * chararray can been cast to int when jira-893 been resolved
+ */
+ @Test
+ public void testExpressionTypeChecking9() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ ArrayList<Object> innerObjList = new ArrayList<Object>();
+ ArrayList<Object> objList = new ArrayList<Object>();
+
+ innerObjList.add("10");
+ innerObjList.add("3");
+ innerObjList.add(7);
+ innerObjList.add("17");
+
+ Tuple innerTuple = tupleFactory.newTuple(innerObjList);
+
+ objList.add("World");
+ objList.add(42);
+ objList.add(innerTuple);
+
+ Tuple tuple = tupleFactory.newTuple(objList);
+
+ ArrayList<Schema.FieldSchema> innerFss = new ArrayList<Schema.FieldSchema>();
+ ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+ ArrayList<Schema.FieldSchema> castFss = new ArrayList<Schema.FieldSchema>();
+
+ Schema.FieldSchema stringFs = new Schema.FieldSchema(null, DataType.CHARARRAY);
+ Schema.FieldSchema intFs = new Schema.FieldSchema(null, DataType.INTEGER);
+ Schema.FieldSchema doubleFs = new Schema.FieldSchema(null, DataType.DOUBLE);
+
+ innerFss.add(stringFs);
+ innerFss.add(stringFs);
+ innerFss.add(intFs);
+ innerFss.add(stringFs);
+
+ Schema innerTupleSchema = new Schema(innerFss);
+
+ fss.add(stringFs);
+ fss.add(intFs);
+ fss.add(new Schema.FieldSchema(null, innerTupleSchema, DataType.TUPLE));
+
+ Schema tupleSchema = new Schema(fss);
+
+ castFss.add(stringFs);
+ castFss.add(stringFs);
+ castFss.add(doubleFs);
+ castFss.add(intFs);
+
+ Schema castSchema = new Schema(castFss);
+
+
+ ConstantExpression constant1 = new ConstantExpression(plan, innerTuple) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, tuple) ;
+ CastExpression cast1 = new CastExpression(plan, constant1,
+ Util.translateFieldSchema(new FieldSchema(null, castSchema, DataType.TUPLE))) ;
+
+ EqualExpression equal1 = new EqualExpression(plan, cast1, constant2) ;
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+ try {
+ expTypeChecker.visit();
+ }
+ catch (TypeCheckerException pve) {
+ fail("Exception not expected") ;
+ }
+
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error expected") ;
+ }
+ }
+
+ @Test
+ public void testArithmeticOpCastInsert1() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20D) ;
+
+ MultiplyExpression mul1 = new MultiplyExpression(plan,constant1, constant2) ;
+
+ // Before type checking its set correctly - PIG-421
+// System.out.println(DataType.findTypeName(mul1.getType())) ;
+// assertEquals(DataType.DOUBLE, mul1.getType()) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+ System.out.println(DataType.findTypeName(mul1.getType())) ;
+ assertEquals(DataType.DOUBLE, mul1.getType()) ;
+ }
+
+ @Test
+ public void testArithmeticOpCastInsert2() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20L) ;
+
+ NegativeExpression neg1 = new NegativeExpression(plan, constant1) ;
+ SubtractExpression subtract1 = new SubtractExpression(plan, neg1, constant2) ;
+
+ // Before type checking its set correctly = PIG-421
+// assertEquals(DataType.LONG, subtract1.getType()) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+ System.out.println(DataType.findTypeName(subtract1.getType())) ;
+ assertEquals(DataType.LONG, subtract1.getType()) ;
+
+ assertTrue(subtract1.getLhs() instanceof CastExpression);
+ assertEquals(((CastExpression)subtract1.getLhs()).getType(), DataType.LONG);
+ assertTrue(((CastExpression)subtract1.getLhs()).getExpression() == neg1);
+ }
+
+ @Test
+ public void testModCastInsert1() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, 20L) ;
+
+ ModExpression mod1 = new ModExpression(plan, constant1, constant2) ;
+
+ // Before type checking its set correctly = PIG-421
+// assertEquals(DataType.LONG, mod1.getType()) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+ System.out.println(DataType.findTypeName(mod1.getType())) ;
+ assertEquals(DataType.LONG, mod1.getType()) ;
+
+ assertTrue(mod1.getLhs() instanceof CastExpression);
+ assertEquals(((CastExpression)mod1.getLhs()).getType(), DataType.LONG);
+ assertTrue(((CastExpression)mod1.getLhs()).getExpression() == constant1);
+ }
+
+
+ // Positive case
+ @Test
+ public void testRegexTypeChecking1() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, "10") ;
+ ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+
+ RegexExpression regex = new RegexExpression(plan, constant1, constant2) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+ System.out.println(DataType.findTypeName(regex.getType())) ;
+ assertEquals(DataType.BOOLEAN, regex.getType()) ;
+ }
+
+ // Positive case with cast insertion
+ @Test
+ public void testRegexTypeChecking2() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan,
+ new DataByteArray()
+ ) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+
+ RegexExpression regex = new RegexExpression(plan, constant1, constant2);
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+
+ if (collector.hasError()) {
+ throw new Exception("Error not expected during type checking") ;
+ }
+
+ // check type
+ System.out.println(DataType.findTypeName(regex.getType())) ;
+ assertEquals(DataType.BOOLEAN, regex.getType()) ;
+
+ // check wiring
+ CastExpression cast = (CastExpression) regex.getLhs() ;
+ assertEquals(cast.getType(), DataType.CHARARRAY);
+ assertEquals(cast.getExpression(), constant1) ;
+ }
+
+ // Negative case
+ @Test
+ public void testRegexTypeChecking3() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+
+ RegexExpression regex = new RegexExpression(plan, constant1, constant2);
+
+ try {
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+ expTypeChecker.visit();
+ printMessageCollector(collector) ;
+ fail("Exception expected") ;
+ }
+ catch (TypeCheckerException pve) {
+ // good
+ }
+
+ }
+
+ // Expression plan has to support DAG before this can be used.
+ // Currently it supports only tree.
+
+ /*
+ @Test
+ public void testDiamondShapedExpressionPlan1() throws Throwable {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+ ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+ constant1.setType(DataType.LONG) ;
+
+ LONegative neg1 = new LONegative(plan, constant1) ;
+ LONegative neg2 = new LONegative(plan, constant1) ;
+
+ DivideExpression div1 = new DivideExpression(plan, neg1, neg2) ;
+
+ LONegative neg3 = new LONegative(plan, div1) ;
+ LONegative neg4 = new LONegative(plan, div1) ;
+
+ AddExpression add1 = new AddExpression(plan, neg3, neg4) ;
+
+ plan.add(constant1) ;
+ plan.add(neg1) ;
+ plan.add(neg2) ;
+ plan.add(div1) ;
+ plan.add(neg3) ;
+ plan.add(neg4) ;
+ plan.add(add1) ;
+
+ plan.connect(constant1, neg1) ;
+ plan.connect(constant1, neg2) ;
+
+ plan.connect(neg1, div1) ;
+ plan.connect(neg2, div1) ;
+
+ plan.connect(div1, neg3) ;
+ plan.connect(div1, neg3) ;
+
+ plan.connect(neg3, add1) ;
+ plan.connect(neg4, add1) ;
+
+ // Before type checking
+ assertEquals(DataType.UNKNOWN, add1.getType()) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+ printMessageCollector(collector) ;
+
+ //printTypeGraph(plan) ;
+
+ // After type checking
+ assertEquals(DataType.LONG, div1.getType()) ;
+ assertEquals(DataType.LONG, add1.getType()) ;
+
+ }
+ */
+
+ // This tests when both inputs need casting
+ //@Test
+ public void testUnionCastingInsert1() throws Throwable {
+
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ LOLoad load2 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+//
+// new LOLoad(plan,
+//
+// new FileSpec("pi", new FuncSpec(pigStorage)),
+// null) ;
+// LOLoad load2 = new LOLoad(plan,
+//
+// new FileSpec("pi", new FuncSpec(pigStorage)),
+// null) ;
+//
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+ fsList1.add(new FieldSchema(null, DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema(null, DataType.CHARARRAY)) ;
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // schema for input#2
+ Schema inputSchema2 = null ;
+ {
+ List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+ fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+ fsList2.add(new FieldSchema("field3b", DataType.FLOAT)) ;
+ fsList2.add(new FieldSchema("field4b", DataType.CHARARRAY)) ;
+ inputSchema2 = new Schema(fsList2) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema(inputSchema1));
+ load2.setScriptSchema(Util.translateSchema(inputSchema2));
+
+ // create union operator
+ ArrayList<LogicalRelationalOperator> inputList = new ArrayList<LogicalRelationalOperator>() ;
+ inputList.add(load1) ;
+ inputList.add(load2) ;
+ LOUnion union = new LOUnion(plan, false) ;
+
+ // wiring
+ plan.add(load1) ;
+ plan.add(load2) ;
+ plan.add(union) ;
+
+ plan.connect(load1, union);
+ plan.connect(load2, union);
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+
+ // check end result schema
+ Schema outputSchema = Util.translateSchema(union.getSchema()) ;
+
+ Schema expectedSchema = null ;
+ {
+ List<FieldSchema> fsListExpected = new ArrayList<FieldSchema>() ;
+ fsListExpected.add(new FieldSchema("field1a", DataType.DOUBLE)) ;
+ fsListExpected.add(new FieldSchema("field2a", DataType.LONG)) ;
+ fsListExpected.add(new FieldSchema("field3b", DataType.FLOAT)) ;
+ fsListExpected.add(new FieldSchema("field4b", DataType.CHARARRAY)) ;
+ expectedSchema = new Schema(fsListExpected) ;
+ }
+
+ assertTrue(Schema.equals(outputSchema, expectedSchema, true, false)) ;
+
+ //printTypeGraph(plan) ;
+
+ // check the inserted casting of input1
+ {
+ // Check wiring
+ List<Operator> sucList1 = plan.getSuccessors(load1);
+ assertEquals(sucList1.size(), 1);
+ LOForEach foreach = (LOForEach) sucList1.get(0) ;
+ assertTrue(foreach instanceof LOForEach) ;
+
+ List<Operator> sucList2 = plan.getSuccessors(foreach);
+ assertEquals(sucList2.size(), 1);
+ assertTrue(sucList2.get(0) instanceof LOUnion) ;
+
+ // Check inserted casting
+ checkForEachCasting(foreach, 0, true, DataType.DOUBLE) ;
+ checkForEachCasting(foreach, 1, false, DataType.UNKNOWN) ;
+ checkForEachCasting(foreach, 2, true, DataType.FLOAT) ;
+ checkForEachCasting(foreach, 3, false, DataType.UNKNOWN) ;
+
+ }
+
+ // check the inserted casting of input2
+ {
+ // Check wiring
+ List<Operator> sucList1 = plan.getSuccessors(load2) ;
+ assertEquals(sucList1.size(), 1);
+ LOForEach foreach = (LOForEach) sucList1.get(0) ;
+ assertTrue(foreach instanceof LOForEach) ;
+
+ List<Operator> sucList2 = plan.getSuccessors(foreach) ;
+ assertEquals(sucList2.size(), 1);
+ assertTrue(sucList2.get(0) instanceof LOUnion) ;
+
+ // Check inserted casting
+ checkForEachCasting(foreach, 0, false, DataType.UNKNOWN) ;
+ checkForEachCasting(foreach, 1, true, DataType.LONG) ;
+ checkForEachCasting(foreach, 2, false, DataType.UNKNOWN) ;
+ checkForEachCasting(foreach, 3, false, DataType.UNKNOWN) ;
+
+ }
+
+ }
+ //
+ //
+ // // This tests when both only on input needs casting
+ // //@Test
+ // public void testUnionCastingInsert2() throws Throwable {
+ //
+ // printCurrentMethodName();
+ // LogicalPlan plan = new LogicalPlan() ;
+ //
+ // String pigStorage = PigStorage.class.getName() ;
+ //
+ // LOLoad load1 = new LOLoad(plan,
+ //
+ // new FileSpec("pi", new FuncSpec(pigStorage)),
+ // null) ;
+ // LOLoad load2 = new LOLoad(plan,
+ //
+ // new FileSpec("pi", new FuncSpec(pigStorage)),
+ // null) ;
+ //
+ // // schema for input#1
+ // Schema inputSchema1 = null ;
+ // {
+ // List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ // fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ // fsList1.add(new FieldSchema("field2a", DataType.BYTEARRAY)) ;
+ // inputSchema1 = new Schema(fsList1) ;
+ // }
+ //
+ // // schema for input#2
+ // Schema inputSchema2 = null ;
+ // {
+ // List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ // fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+ // fsList2.add(new FieldSchema("field2b", DataType.DOUBLE)) ;
+ // inputSchema2 = new Schema(fsList2) ;
+ // }
+ //
+ // // set schemas
+ // load1.setEnforcedSchema(inputSchema1) ;
+ // load2.setEnforcedSchema(inputSchema2) ;
+ //
+ // // create union operator
+ // ArrayList<LogicalOperator> inputList = new ArrayList<LogicalOperator>() ;
+ // inputList.add(load1) ;
+ // inputList.add(load2) ;
+ // LOUnion union = new LOUnion(plan) ;
+ //
+ // // wiring
+ // plan.add(load1) ;
+ // plan.add(load2) ;
+ // plan.add(union) ;
+ //
+ // plan.connect(load1, union);
+ // plan.connect(load2, union);
+ //
+ // // validate
+ // CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ // TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ // typeValidator.validate(plan, collector) ;
+ // printMessageCollector(collector) ;
+ //
+ // // check end result schema
+ // Schema outputSchema = union.getSchema() ;
+ //
+ // Schema expectedSchema = null ;
+ // {
+ // List<FieldSchema> fsListExpected = new ArrayList<FieldSchema>() ;
+ // fsListExpected.add(new FieldSchema("field1a", DataType.DOUBLE)) ;
+ // fsListExpected.add(new FieldSchema("field2a", DataType.DOUBLE)) ;
+ // expectedSchema = new Schema(fsListExpected) ;
+ // }
+ //
+ // assertTrue(Schema.equals(outputSchema, expectedSchema, true, false)) ;
+ //
+ // //printTypeGraph(plan) ;
+ //
+ // // check the inserted casting of input1
+ // {
+ // // Check wiring
+ // List<LogicalOperator> sucList1 = plan.getSuccessors(load1) ;
+ // assertEquals(sucList1.size(), 1);
+ // LOForEach foreach = (LOForEach) sucList1.get(0) ;
+ // assertTrue(foreach instanceof LOForEach) ;
+ //
+ // List<LogicalOperator> sucList2 = plan.getSuccessors(foreach) ;
+ // assertEquals(sucList2.size(), 1);
+ // assertTrue(sucList2.get(0) instanceof LOUnion) ;
+ //
+ // // Check inserted casting
+ // checkForEachCasting(foreach, 0, true, DataType.DOUBLE) ;
+ // checkForEachCasting(foreach, 1, true, DataType.DOUBLE) ;
+ //
+ // }
+ //
+ // // check the inserted casting of input2
+ // {
+ // // Check wiring
+ // List<LogicalOperator> sucList1 = plan.getSuccessors(load2) ;
+ // assertEquals(sucList1.size(), 1);
+ // assertTrue(sucList1.get(0) instanceof LOUnion) ;
+ // }
+ //
+ // }
+ //
+ // // This has to fail under strict typing mode
+ // /*
+ // // This is a negative test
+ // // Two inputs cannot be merged due to incompatible schemas
+ // @Test
+ // public void testUnionCastingInsert3() throws Throwable {
+ // LogicalPlan plan = new LogicalPlan() ;
+ //
+ // String pigStorage = PigStorage.class.getName() ;
+ //
+ // LOLoad load1 = new LOLoad(plan,
+ //
+ // new FileSpec("pi", new FuncSpec(pigStorage)),
+ // null) ;
+ // LOLoad load2 = new LOLoad(plan,
+ //
+ // new FileSpec("pi", new FuncSpec(pigStorage)),
+ // null) ;
+ //
+ // // schema for input#1
+ // Schema inputSchema1 = null ;
+ // {
+ // List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ // fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ // fsList1.add(new FieldSchema("field2a", DataType.BYTEARRAY)) ;
+ // inputSchema1 = new Schema(fsList1) ;
+ // }
+ //
+ // // schema for input#2
+ // Schema inputSchema2 = null ;
+ // {
+ // List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ // fsList2.add(new FieldSchema("field1b", DataType.CHARARRAY)) ;
+ // fsList2.add(new FieldSchema("field2b", DataType.DOUBLE)) ;
+ // inputSchema2 = new Schema(fsList2) ;
+ // }
+ //
+ // // set schemas
+ // load1.setEnforcedSchema(inputSchema1) ;
+ // load2.setEnforcedSchema(inputSchema2) ;
+ //
+ // // create union operator
+ // ArrayList<LogicalOperator> inputList = new ArrayList<LogicalOperator>() ;
+ // inputList.add(load1) ;
+ // inputList.add(load2) ;
+ // LOUnion union = new LOUnion(plan, inputList) ;
+ //
+ // // wiring
+ // plan.add(load1) ;
+ // plan.add(load2) ;
+ // plan.add(union) ;
+ //
+ // plan.connect(load1, union);
+ // plan.connect(load2, union);
+ //
+ // // validate
+ // CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ // TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ // try {
+ // typeValidator.validate(plan, collector) ;
+ // fail("Exception expected") ;
+ // }
+ // catch (TypeCheckerException pve) {
+ // // good
+ // }
+ // printMessageCollector(collector) ;
+ //
+ // }
+ // */
+
+ @Test
+ public void testDistinct1() throws Throwable {
+
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> innerList = new ArrayList<FieldSchema>() ;
+ innerList.add(new FieldSchema("innerfield1", DataType.BAG)) ;
+ innerList.add(new FieldSchema("innerfield2", DataType.FLOAT)) ;
+ Schema innerSchema = new Schema(innerList) ;
+
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2", DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema("field3", innerSchema)) ;
+ fsList1.add(new FieldSchema("field4", DataType.BAG)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setSchema(Util.translateSchema(inputSchema1)) ;
+
+ // create union operator
+ LODistinct distinct1 = new LODistinct(plan) ;
+
+ // wiring
+ plan.add(load1) ;
+ plan.add(distinct1) ;
+
+ plan.connect(load1, distinct1);
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+
+ // check end result schema
+ LogicalSchema outputSchema = distinct1.getSchema() ;
+ assertTrue(load1.getSchema().isEqual(outputSchema));
+
+
+ }
+
+ // Positive test
+ @Test
+ public void testFilterWithInnerPlan1() throws Throwable {
+ testFilterWithInnerPlan(DataType.INTEGER, DataType.LONG) ;
+ }
+
+ // Positive test
+ @Test
+ public void testFilterWithInnerPlan2() throws Throwable {
+ testFilterWithInnerPlan(DataType.INTEGER, DataType.BYTEARRAY) ;
+ }
+
+ // Filter test helper
+ public void testFilterWithInnerPlan(byte field1Type, byte field2Type) throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", field1Type)) ;
+ fsList1.add(new FieldSchema("field2", field2Type)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema(inputSchema1)) ;
+
+ // Create inner plan
+ LogicalExpressionPlan innerPlan = new LogicalExpressionPlan() ;
+ // filter
+ LOFilter filter1 = new LOFilter(plan) ;
+ filter1.setFilterPlan(innerPlan);
+
+ ProjectExpression project1 = new ProjectExpression(innerPlan, 0, 0, filter1);
+
+// new ProjectExpression(innerPlan, load1, 0) ;
+ // project1.setSentinel(true);
+ ProjectExpression project2 = new ProjectExpression(innerPlan, 0, 1, filter1);
+ //new ProjectExpression(innerPlan, load1, 1) ;
+ //project2.setSentinel(true);
+
+ GreaterThanExpression gt1 = new GreaterThanExpression(innerPlan, project1, project2);
+
+ plan.add(load1);
+ plan.add(filter1);
+ plan.connect(load1, filter1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LogicalSchema endResultSchema = filter1.getSchema() ;
+ assertEquals(endResultSchema.getField(0).type, field1Type) ;
+ assertEquals(endResultSchema.getField(1).type, field2Type) ;
+
+ }
+
+ // Negative test
+ @Test
+ public void testFilterWithInnerPlan3() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2", DataType.LONG)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+
+ // Create inner plan
+ LogicalExpressionPlan innerPlan = new LogicalExpressionPlan() ;
+
+ // filter
+ LOFilter filter1 = new LOFilter(plan) ;
+ filter1.setFilterPlan(innerPlan);
+
+ ProjectExpression project1 = new ProjectExpression(innerPlan, 0, 0, filter1) ;
+ ProjectExpression project2 = new ProjectExpression(innerPlan, 0, 1, filter1) ;
+
+ AddExpression add1 = new AddExpression(innerPlan, project1, project2) ;
+
+ plan.add(load1);
+ plan.add(filter1);
+ plan.connect(load1, filter1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ try {
+ typeChecker.visit();
+ }
+ catch (Exception t) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+
+ // Simple project sort columns
+ @Test
+ public void testSortWithInnerPlan1() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.LONG)) ;
+ fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+ // Create project inner plan #1
+ LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+ // Sort
+ LOSort sort1 = new LOSort(plan);
+ ProjectExpression project1 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+
+ innerPlan1.add(project1) ;
+
+ // Create project inner plan #2
+ LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+ ProjectExpression project2 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+
+ innerPlan2.add(project2) ;
+
+ // List of innerplans
+ List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+ innerPlans.add(innerPlan1) ;
+ innerPlans.add(innerPlan2) ;
+
+ // List of ASC flags
+ List<Boolean> ascList = new ArrayList<Boolean>() ;
+ ascList.add(true);
+ ascList.add(true);
+
+ sort1.setAscendingCols(ascList);
+ sort1.setSortColPlans(innerPlans);
+
+ plan.add(load1);
+ plan.add(sort1);
+ plan.connect(load1, sort1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LogicalSchema endResultSchema = sort1.getSchema() ;
+
+ // outer
+ assertEquals(endResultSchema.getField(0).type, DataType.LONG) ;
+ assertEquals(endResultSchema.getField(1).type, DataType.INTEGER) ;
+
+ // inner
+ assertEquals(getSingleOutput(innerPlan1).getType(), DataType.INTEGER);
+ assertEquals(getSingleOutput(innerPlan2).getType(), DataType.LONG);
+
+ }
+
+
+
+ private LogicalExpression getSingleOutput(LogicalExpressionPlan innerPlan1) {
+ List<Operator> outputs = innerPlan1.getSources();
+ assertEquals("number of outputs in exp plan", outputs.size(),1);
+ return (LogicalExpression)outputs.get(0);
+ }
+
+ // Positive expression sort columns
+ @Test
+ public void testSortWithInnerPlan2() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+ LOSort sort1 = new LOSort(plan) ;
+
+
+ // Create expression inner plan #1
+ LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+ ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, sort1) ;
+ ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+
+ MultiplyExpression mul1 = new MultiplyExpression(innerPlan1, project11, project12) ;
+
+ // Create expression inner plan #2
+ LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+ ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+
+ ConstantExpression const21 = new ConstantExpression(innerPlan2, 26L) ;
+ ModExpression mod21 = new ModExpression(innerPlan2, project21, const21) ;
+
+ // List of innerplans
+ List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+ innerPlans.add(innerPlan1) ;
+ innerPlans.add(innerPlan2) ;
+
+ // List of ASC flags
+ List<Boolean> ascList = new ArrayList<Boolean>() ;
+ ascList.add(true);
+ ascList.add(true);
+
+ // Sort
+ sort1.setAscendingCols(ascList);
+ sort1.setSortColPlans(innerPlans);
+
+ plan.add(load1);
+ plan.add(sort1);
+ plan.connect(load1, sort1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LogicalSchema endResultSchema = sort1.getSchema() ;
+
+ // outer
+ assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema.getField(1).type, DataType.INTEGER) ;
+
+ // inner
+ assertEquals(getSingleOutput(innerPlan1).getType(), DataType.INTEGER);
+ assertEquals(getSingleOutput(innerPlan2).getType(), DataType.LONG);
+
+ }
+
+ // Negative test on expression sort columns
+ @Test
+ public void testSortWithInnerPlan3() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+
+ // Sort
+ LOSort sort1 = new LOSort(plan) ;
+
+ // Create expression inner plan #1
+ LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+ ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, sort1) ;
+ ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+ MultiplyExpression mul1 = new MultiplyExpression(innerPlan1, project11, project12) ;
+
+
+ // Create expression inner plan #2
+ LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+ ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+ ConstantExpression const21 = new ConstantExpression(innerPlan2, "26") ;
+ ModExpression mod21 = new ModExpression(innerPlan2, project21, const21) ;
+
+ // List of innerplans
+ List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+ innerPlans.add(innerPlan1) ;
+ innerPlans.add(innerPlan2) ;
+
+ // List of ASC flags
+ List<Boolean> ascList = new ArrayList<Boolean>() ;
+ ascList.add(true);
+ ascList.add(true);
+
+ // Sort
+ sort1.setAscendingCols(ascList);
+ sort1.setSortColPlans(innerPlans);
+
+ plan.add(load1);
+ plan.add(sort1);
+ plan.connect(load1, sort1) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ try {
+ typeChecker.visit();
+ fail("Error expected") ;
+ }
+ catch (Exception t) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Error expected") ;
+ }
+
+ }
+
+
+ // Positive expression cond columns
+ @Test
+ public void testSplitWithInnerPlan1() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+
+ // split
+ LOSplit split1 = new LOSplit(plan);
+
+ // output1
+ LOSplitOutput splitOutput1 = new LOSplitOutput(plan) ;
+
+ // output2
+ LOSplitOutput splitOutput2 = new LOSplitOutput(plan) ;
+
+ // Create expression inner plan #1
+ LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+ ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, splitOutput1) ;
+
+ ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, splitOutput1) ;
+
+ NotEqualExpression notequal1 = new NotEqualExpression(innerPlan1, project11, project12) ;
+
+// innerPlan1.add(project11) ;
+// innerPlan1.add(project12) ;
+// innerPlan1.add(notequal1) ;
+//
+// innerPlan1.connect(project11, notequal1);
+// innerPlan1.connect(project12, notequal1);
+
+ // Create expression inner plan #2
+ LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+ ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, splitOutput2) ;
+
+ ConstantExpression const21 = new ConstantExpression(innerPlan2, 26L) ;
+ LessThanEqualExpression lesser21 = new LessThanEqualExpression(innerPlan2, project21, const21) ;
+//
+// innerPlan2.add(project21) ;
+// innerPlan2.add(const21) ;
+// innerPlan2.add(lesser21) ;
+//
+// innerPlan2.connect(project21, lesser21);
+// innerPlan2.connect(const21, lesser21) ;
+
+
+ splitOutput1.setFilterPlan(innerPlan1);
+ splitOutput2.setFilterPlan(innerPlan2);
+
+ plan.add(load1);
+ plan.add(split1);
+ plan.add(splitOutput1);
+ plan.add(splitOutput2);
+
+ plan.connect(load1, split1) ;
+ plan.connect(split1, splitOutput1) ;
+ plan.connect(split1, splitOutput2) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ // check split itself
+ {
+ LogicalSchema endResultSchema1 = split1.getSchema() ;
+ // outer
+ assertEquals(endResultSchema1.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema1.getField(1).type, DataType.INTEGER) ;
+ }
+
+ // check split output #1
+ {
+ LogicalSchema endResultSchema1 = splitOutput1.getSchema() ;
+ // outer
+ assertEquals(endResultSchema1.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema1.getField(1).type, DataType.INTEGER) ;
+ }
+
+ // check split output #2
+ {
+ LogicalSchema endResultSchema2 = splitOutput2.getSchema() ;
+ // outer
+ assertEquals(endResultSchema2.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema2.getField(1).type, DataType.INTEGER) ;
+ }
+
+ // inner conditions: all have to be boolean
+ assertEquals(getSingleOutput(innerPlan1).getType(), DataType.BOOLEAN);
+ assertEquals(getSingleOutput(innerPlan2).getType(), DataType.BOOLEAN);
+
+ }
+
+ // Negative test: expression cond columns not evaluate to boolean
+ @Test
+ public void testSplitWithInnerPlan2() throws Throwable {
+
+ // Create outer plan
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+ fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+ // split
+ LOSplit split1 = new LOSplit(plan);
+
+ // output1
+ LOSplitOutput splitOutput1 = new LOSplitOutput(plan) ;
+
+ // output2
+ LOSplitOutput splitOutput2 = new LOSplitOutput(plan) ;
+
+ // Create expression inner plan #1
+ LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+ ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, splitOutput1) ;
+
+ ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, splitOutput1) ;
+
+ NotEqualExpression notequal1 = new NotEqualExpression(innerPlan1, project11, project12) ;
+
+// innerPlan1.add(project11) ;
+// innerPlan1.add(project12) ;
+// innerPlan1.add(notequal1) ;
+//
+// innerPlan1.connect(project11, notequal1);
+// innerPlan1.connect(project12, notequal1);
+
+ // Create expression inner plan #2
+ LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+ ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, splitOutput1) ;
+
+ ConstantExpression const21 =
+ new ConstantExpression(innerPlan2, 26L) ;
+
+ SubtractExpression subtract21 =
+ new SubtractExpression(innerPlan2, project21, const21) ;
+
+// innerPlan2.add(project21) ;
+// innerPlan2.add(const21) ;
+// innerPlan2.add(subtract21) ;
+//
+// innerPlan2.connect(project21, subtract21);
+// innerPlan2.connect(const21, subtract21) ;
+
+ splitOutput1.setFilterPlan(innerPlan1);
+ splitOutput2.setFilterPlan(innerPlan2);
+
+ plan.add(load1);
+ plan.add(split1);
+ plan.add(splitOutput1);
+ plan.add(splitOutput2);
+
+ plan.connect(load1, split1) ;
+ plan.connect(split1, splitOutput1) ;
+ plan.connect(split1, splitOutput2) ;
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ try {
+ typeChecker.visit();
+ }
+ catch (Exception t) {
+ // good
+ }
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Error expected") ;
+ }
+
+ }
+
+ // Positive test
+ @Test
+ public void testCOGroupWithInnerPlan1GroupByTuple1() throws Throwable {
+
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ LOLoad load2 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // schema for input#2
+ Schema inputSchema2 = null ;
+ {
+ List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+ fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+ inputSchema2 = new Schema(fsList2) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+ load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+ load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+
+ LOCogroup cogroup1 = new LOCogroup(plan);
+
+ // Create expression inner plan #1 of input #1
+ LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+ ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+ ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+ SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+
+// innerPlan11.add(project111) ;
+// innerPlan11.add(const111) ;
+// innerPlan11.add(subtract111) ;
+//
+// innerPlan11.connect(project111, subtract111);
+// innerPlan11.connect(const111, subtract111) ;
+
+ // Create expression inner plan #2 of input #1
+ LogicalExpressionPlan innerPlan21 = new LogicalExpressionPlan() ;
+ ProjectExpression project211 = new ProjectExpression(innerPlan21, 0, 0, cogroup1) ;
+ ProjectExpression project212 = new ProjectExpression(innerPlan21, 0, 1, cogroup1) ;
+
+ AddExpression add211 = new AddExpression(innerPlan21, project211, project212) ;
+
+// innerPlan21.add(project211) ;
+// innerPlan21.add(project212) ;
+// innerPlan21.add(add211) ;
+//
+// innerPlan21.connect(project211, add211);
+// innerPlan21.connect(project212, add211) ;
+
+ // Create expression inner plan #1 of input #2
+ LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+ ProjectExpression project121 = new ProjectExpression(innerPlan12, 1, 0, cogroup1) ;
+ ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+ SubtractExpression subtract121 = new SubtractExpression(innerPlan12, project121, const121) ;
+
+// innerPlan12.add(project121) ;
+// innerPlan12.add(const121) ;
+// innerPlan12.add(subtract121) ;
+//
+// innerPlan12.connect(project121, subtract121);
+// innerPlan12.connect(const121, subtract121) ;
+
+ // Create expression inner plan #2 of input #2
+ LogicalExpressionPlan innerPlan22 = new LogicalExpressionPlan() ;
+ ConstantExpression const122 = new ConstantExpression(innerPlan22, 26) ;
+// innerPlan22.add(const122) ;
+
+ // Create Cogroup
+ ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+ inputs.add(load1) ;
+ inputs.add(load2) ;
+
+ MultiMap<Integer, LogicalExpressionPlan> maps
+ = new MultiMap<Integer, LogicalExpressionPlan>() ;
+ maps.put(0, innerPlan11);
+ maps.put(0, innerPlan21);
+ maps.put(1, innerPlan12);
+ maps.put(1, innerPlan22);
+
+ boolean[] isInner = new boolean[inputs.size()] ;
+ for (int i=0; i < isInner.length ; i++) {
+ isInner[i] = false ;
+ }
+
+ cogroup1.setInnerFlags(isInner);
+ cogroup1.setExpressionPlans(maps);
+
+ // construct the main plan
+ plan.add(load1) ;
+ plan.add(load2) ;
+ plan.add(cogroup1) ;
+
+ plan.connect(load1, cogroup1);
+ plan.connect(load2, cogroup1);
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ // check outer schema
+ LogicalSchema endResultSchema = cogroup1.getSchema() ;
+
+ // Tuple group column
+ assertEquals(endResultSchema.getField(0).type, DataType.TUPLE) ;
+ assertEquals(endResultSchema.getField(0).schema.getField(0).type, DataType.DOUBLE) ;
+ assertEquals(endResultSchema.getField(0).schema.getField(1).type, DataType.LONG);
+
+ assertEquals(endResultSchema.getField(1).type, DataType.BAG) ;
+ assertEquals(endResultSchema.getField(2).type, DataType.BAG) ;
+
+ // check inner schema1
+ LogicalSchema innerSchema1 = endResultSchema.getField(1).schema.getField(0).schema ;
+ assertEquals(innerSchema1.getField(0).type, DataType.INTEGER);
+ assertEquals(innerSchema1.getField(1).type, DataType.LONG);
+
+ // check inner schema2
+ LogicalSchema innerSchema2 = endResultSchema.getField(2).schema.getField(0).schema ;
+ assertEquals(innerSchema2.getField(0).type, DataType.DOUBLE);
+ assertEquals(innerSchema2.getField(1).type, DataType.INTEGER);
+
+ // check group by col end result
+ assertEquals(getSingleOutput(innerPlan11).getType(), DataType.DOUBLE) ;
+ assertEquals(getSingleOutput(innerPlan21).getType(), DataType.LONG) ;
+ assertEquals(getSingleOutput(innerPlan12).getType(), DataType.DOUBLE) ;
+ assertEquals(getSingleOutput(innerPlan22).getType(), DataType.LONG) ;
+ }
+
+
+ // Positive test
+ @Test
+ public void testCOGroupWithInnerPlan1GroupByAtom1() throws Throwable {
+
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ LOLoad load2 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // schema for input#2
+ Schema inputSchema2 = null ;
+ {
+ List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+ fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+ inputSchema2 = new Schema(fsList2) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+ load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+
+ LOCogroup cogroup1 = new LOCogroup(plan);
+
+ // Create expression inner plan #1 of input #1
+ LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+ ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+ ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+ SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+
+// innerPlan11.add(project111) ;
+// innerPlan11.add(const111) ;
+// innerPlan11.add(subtract111) ;
+//
+// innerPlan11.connect(project111, subtract111);
+// innerPlan11.connect(const111, subtract111) ;
+
+ // Create expression inner plan #1 of input #2
+ LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+ ProjectExpression project121 = new ProjectExpression(innerPlan12, 1, 0, cogroup1) ;
+ ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+ SubtractExpression subtract121 = new SubtractExpression(innerPlan12, project121, const121) ;
+
+// innerPlan12.add(project121) ;
+// innerPlan12.add(const121) ;
+// innerPlan12.add(subtract121) ;
+//
+// innerPlan12.connect(project121, subtract121);
+// innerPlan12.connect(const121, subtract121) ;
+
+ // Create Cogroup
+ ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+ inputs.add(load1) ;
+ inputs.add(load2) ;
+
+ MultiMap<Integer, LogicalExpressionPlan> maps
+ = new MultiMap<Integer, LogicalExpressionPlan>() ;
+ maps.put(0, innerPlan11);
+ maps.put(1, innerPlan12);
+
+ boolean[] isInner = new boolean[inputs.size()] ;
+ for (int i=0; i < isInner.length ; i++) {
+ isInner[i] = false ;
+ }
+
+
+ cogroup1.setInnerFlags(isInner);
+ cogroup1.setExpressionPlans(maps);
+
+ // construct the main plan
+ plan.add(load1) ;
+ plan.add(load2) ;
+ plan.add(cogroup1) ;
+
+ plan.connect(load1, cogroup1);
+ plan.connect(load2, cogroup1);
+
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+ typeChecker.visit();
+ printMessageCollector(collector) ;
+ //printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ // check outer schema
+ LogicalSchema endResultSchema = cogroup1.getSchema() ;
+
+ // Tuple group column
+ assertEquals(endResultSchema.getField(0).type, DataType.DOUBLE) ;
+
+ assertEquals(endResultSchema.getField(1).type, DataType.BAG) ;
+ assertEquals(endResultSchema.getField(2).type, DataType.BAG) ;
+
+ // check inner schema1
+ LogicalSchema innerSchema1 = endResultSchema.getField(1).schema.getField(0).schema ;
+ assertEquals(innerSchema1.getField(0).type, DataType.INTEGER);
+ assertEquals(innerSchema1.getField(1).type, DataType.LONG);
+
+ // check inner schema2
+ LogicalSchema innerSchema2 = endResultSchema.getField(2).schema.getField(0).schema ;
+ assertEquals(innerSchema2.getField(0).type, DataType.DOUBLE);
+ assertEquals(innerSchema2.getField(1).type, DataType.INTEGER);
+
+ // check group by col end result
+ assertEquals(getSingleOutput(innerPlan11).getType(), DataType.DOUBLE) ;
+ assertEquals(getSingleOutput(innerPlan12).getType(), DataType.DOUBLE) ;
+ }
+
+
+ // Positive test
+ @Test
+ public void testCOGroupWithInnerPlan1GroupByIncompatibleAtom1() throws Throwable {
+
+ printCurrentMethodName();
+ LogicalPlan plan = new LogicalPlan() ;
+
+ String pigStorage = PigStorage.class.getName() ;
+
+ LOLoad load1 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ LOLoad load2 = new LOLoad(
+ new FileSpec("pi", new FuncSpec(pigStorage)),
+ null, plan, null
+ );
+
+ // schema for input#1
+ Schema inputSchema1 = null ;
+ {
+ List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+ fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+ fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+ inputSchema1 = new Schema(fsList1) ;
+ }
+
+ // schema for input#2
+ Schema inputSchema2 = null ;
+ {
+ List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+ fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+ fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+ inputSchema2 = new Schema(fsList2) ;
+ }
+
+ // set schemas
+ load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+ load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+
+ LOCogroup cogroup1 = new LOCogroup(plan);
+ // Create expression inner plan #1
+ LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+ ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+ ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+ SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+
+// innerPlan11.add(project111) ;
+// innerPlan11.add(const111) ;
+// innerPlan11.add(subtract111) ;
+//
+// innerPlan11.connect(project111, subtract111);
+// innerPlan11.connect(const111, subtract111) ;
+
+ // Create expression inner plan #2
+ LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+ ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+// innerPlan12.add(const121) ;
+
+ // Create Cogroup
+ ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+ inputs.add(load1) ;
+ inputs.add(load2) ;
+
+ MultiMap<Integer, LogicalExpressionPlan> maps
+ = new MultiMap<Integer, LogicalExpressionPlan>() ;
+ maps.put(0, innerPlan11);
+ maps.put(1, innerPlan12);
+
+ boolean[] isInner = new boolean[inputs.size()] ;
+ for (int i=0; i < isInner.length ; i++) {
[... 3083 lines stripped ...]