You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2016/06/14 19:14:39 UTC
svn commit: r1748451 - in /pig/trunk: ./
src/org/apache/pig/newplan/logical/relational/
src/org/apache/pig/newplan/logical/visitor/ test/org/apache/pig/test/
Author: knoguchi
Date: Tue Jun 14 19:14:39 2016
New Revision: 1748451
URL: http://svn.apache.org/viewvc?rev=1748451&view=rev
Log:
PIG-2315: Make as clause work in generate (daijy via knoguchi)
Added:
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jun 14 19:14:39 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2315: Make as clause work in generate (daijy via knoguchi)
+
PIG-4921: Kill running jobs on InterruptedException (rohini)
PIG-4916: Pig on Tez fail to remove temporary HDFS files in some cases (daijy)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Tue Jun 14 19:14:39 2016
@@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR
// to store uid in mUserDefinedSchema
private List<LogicalSchema> mUserDefinedSchema = null;
private List<LogicalSchema> outputPlanSchemas = null;
+ private List<LogicalSchema> expSchemas = null;
// If LOGenerate generate new uid, cache it here.
// This happens when expression plan does not have complete schema, however,
// user give complete schema in ForEach statement in script
@@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR
schema = new LogicalSchema();
outputPlanSchemas = new ArrayList<LogicalSchema>();
+ expSchemas = new ArrayList<LogicalSchema>();
for(int i=0; i<outputPlans.size(); i++) {
LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -97,8 +99,6 @@ public class LOGenerate extends LogicalR
// if type is primitive, just add to schema
if (fieldSchema!=null)
expSchema.addField(fieldSchema);
- else
- expSchema = null;
} else {
// if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
if (fieldSchema.schema==null) {
@@ -137,6 +137,7 @@ public class LOGenerate extends LogicalR
if (expSchema!=null && expSchema.size()==0)
expSchema = null;
LogicalSchema planSchema = new LogicalSchema();
+ expSchemas.add(expSchema);
if (mUserDefinedSchemaCopy!=null) {
LogicalSchema mergedSchema = new LogicalSchema();
// merge with userDefinedSchema
@@ -146,12 +147,6 @@ public class LOGenerate extends LogicalR
fs.stampFieldSchema();
mergedSchema.addField(new LogicalFieldSchema(fs));
}
- for (LogicalFieldSchema fs : mergedSchema.getFields()) {
- if (fs.type == DataType.NULL){
- //this is the use case where a new alias has been specified by user
- fs.type = DataType.BYTEARRAY;
- }
- }
} else {
// Merge uid with the exp field schema
@@ -163,8 +158,12 @@ public class LOGenerate extends LogicalR
mergedSchema.mergeUid(expSchema);
}
- for (LogicalFieldSchema fs : mergedSchema.getFields())
+ for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+ if (fs.type==DataType.NULL) {
+ fs.type = DataType.BYTEARRAY;
+ }
planSchema.addField(fs);
+ }
} else {
// if any plan do not have schema, the whole LOGenerate do not have schema
if (expSchema==null) {
@@ -310,4 +309,8 @@ public class LOGenerate extends LogicalR
super.resetSchema();
outputPlanSchemas = null;
}
+
+ public List<LogicalSchema> getExpSchemas() {
+ return expSchemas;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Tue Jun 14 19:14:39 2016
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi
import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
+import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe
new ColumnAliasConversionVisitor(this).visit();
new SchemaAliasVisitor(this).visit();
new ScalarVisitor(this, pigContext, scope).visit();
+ new ForEachUserSchemaVisitor(this).visit();
// ImplicitSplitInsertVisitor has to be called before
// DuplicateForEachColumnRewriteVisitor. Detail at pig-1766
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Tue Jun 14 19:14:39 2016
@@ -150,6 +150,39 @@ public class LogicalSchema {
}
return true;
}
+
+ // Check if fs1 is equal to fs2 with regard to type
+ public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
+ if (fs1==null && fs2==null) {
+ return true;
+ }
+ if (fs1==null || fs2==null) {
+ return false;
+ }
+ if (fs1.type!=fs2.type) {
+ return false;
+ }
+ if (DataType.isComplex(fs1.type)) {
+ LogicalSchema s1 = fs1.schema;
+ LogicalSchema s2 = fs2.schema;
+ if (s1==null && s2==null) {
+ return true;
+ }
+ if (fs1==null || fs2==null) {
+ return false;
+ }
+ if (s1.size()!=s2.size()) {
+ return false;
+ }
+ for (int i=0;i<s1.size();i++) {
+ if (!typeMatch(s1.getField(i), s2.getField(i))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Adds the uid from FieldSchema argument to this FieldSchema
* If the argument is null, it stamps this FieldSchema with uid
Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1748451&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Tue Jun 14 19:14:39 2016
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor {
+ public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOForEach foreach) throws FrontendException {
+ LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+ List<LogicalSchema> mExpSchemas = generate.getExpSchemas();
+ List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema();
+
+ // Skip if no way to figure out schema (usually both expression schema and
+ // user defined schema are null)
+ if (foreach.getSchema()==null) {
+ return;
+ }
+
+ if (mUserDefinedSchemas==null) {
+ return;
+ }
+
+ boolean hasUserDefinedSchema = false;
+ for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) {
+ if (mUserDefinedSchema!=null) {
+ hasUserDefinedSchema = true;
+ break;
+ }
+ }
+
+ if (!hasUserDefinedSchema) {
+ return;
+ }
+
+ if (mExpSchemas.size()!=mUserDefinedSchemas.size()) {
+ throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() +
+ " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas",
+ 0, generate.getLocation());
+ }
+
+ LogicalPlan innerPlan = new LogicalPlan();
+ LOForEach casterForEach = new LOForEach(plan);
+ casterForEach.setInnerPlan(innerPlan);
+ casterForEach.setAlias(foreach.getAlias());
+
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, null);
+ innerPlan.add(gen);
+
+ int index = 0;
+ boolean needCast = false;
+ for(int i=0;i<mExpSchemas.size();i++) {
+ LogicalSchema mExpSchema = mExpSchemas.get(i);
+ LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i);
+
+ // Use user defined schema to cast, this is the prevailing use case
+ if (mExpSchema==null) {
+ for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+ if (fs.type==DataType.NULL||fs.type==DataType.BYTEARRAY) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ } else {
+ addToExps(casterForEach, innerPlan, gen, exps, index, true, fs);
+ needCast = true;
+ }
+ index++;
+ }
+ continue;
+ }
+
+ // No user defined schema, no need to cast
+ if (mUserDefinedSchema==null) {
+ for (int j=0;j<mExpSchema.size();j++) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ index++;
+ }
+ continue;
+ }
+
+ // Expression has schema, but user also define schema, need cast only
+ // when there is a mismatch
+ if (mExpSchema.size()!=mUserDefinedSchema.size()) {
+ throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() +
+ " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation());
+ }
+
+ for (int j=0;j<mExpSchema.size();j++) {
+ LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j);
+ LogicalFieldSchema mUserDefinedFieldSchema = mUserDefinedSchema.getField(j);
+ if (mUserDefinedFieldSchema.type==DataType.NULL ||
+ mUserDefinedFieldSchema.type==DataType.BYTEARRAY ||
+ LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ } else {
+ addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema);
+ needCast = true;
+ }
+ index++;
+ }
+ }
+
+ gen.setFlattenFlags(new boolean[index]);
+ if (needCast) {
+ // Insert the casterForEach into the plan and patch up the plan.
+ Operator next = plan.getSuccessors(foreach).get(0);
+ plan.insertBetween(foreach, casterForEach, next);
+
+ // Since the explict cast is now inserted after the original foreach,
+ // throwing away the user defined "types" but keeping the user
+ // defined names from the original foreach.
+ // 'generate' (LOGenerate) still holds the reference to this
+ // mUserDefinedSchemas
+ for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) {
+ if( mUserDefinedSchema != null ) {
+ for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+ fs.type = DataType.NULL;
+ }
+ }
+ }
+ }
+ }
+
+ private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen,
+ List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) {
+
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index);
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ ProjectExpression prj = new ProjectExpression(exp, index, 0, gen);
+ exp.add(prj);
+
+ if (needCaster) {
+ CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+ exp.add(cast);
+ }
+ exps.add(exp);
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java Tue Jun 14 19:14:39 2016
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
@@ -36,15 +39,22 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOStore;
@@ -311,4 +321,192 @@ public class TestPlanGeneration {
assertNotNull(((PartitionedLoader)loLoad.getLoadFunc()).getPartFilter());
assertEquals("b", loStore.getAlias());
}
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast1() throws Exception {
+ // A cast ForEach is inserted to take care of the user schema
+ String query = "A = load 'foo' as (a, b:int);\n" +
+ "B = foreach A generate a as a0:chararray, b as b:int;\n" +
+ "store B into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+ // before a0 is typecasted to chararray, it should be bytearray
+ assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(0).type);
+ // type of b should stay as int
+ assertEquals(DataType.INTEGER, loForEach1.getSchema().getField(1).type);
+ assertEquals("B", loForEach2.getAlias());
+ LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+ CastExpression cast = (CastExpression)generate.getOutputPlans().get(0).getSources().get(0);
+ Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+ assertEquals(loForEach2.getSchema().getField(0).alias, "a0");
+ Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast2() throws Exception {
+ // No additional cast ForEach will be inserted, but schema should match
+ String query = "A = load 'foo' as (a, b);\n" +
+ "B = foreach A generate (chararray)a as a0:chararray;\n" +
+ "store B into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ assertEquals(loForEach.getSchema().getField(0).alias, "a0");
+ Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast3() throws Exception {
+ // No additional cast ForEach will be inserted, but schema should match
+ String query = "A = load 'foo' as (a, b);\n" +
+ "B = foreach A generate (chararray)a as a0:int;\n" +
+ "store B into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ LOGenerate generate1 = (LOGenerate)loForEach1.getInnerPlan().getSinks().get(0);
+ CastExpression cast1 = (CastExpression)generate1.getOutputPlans().get(0).getSources().get(0);
+ Assert.assertTrue(cast1.getType()==DataType.CHARARRAY);
+ //before a0 is typecasted to int, it should be chararray
+ Assert.assertEquals(DataType.CHARARRAY, loForEach1.getSchema().getField(0).type);
+ LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+ LOGenerate generate2 = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+ CastExpression cast2 = (CastExpression)generate2.getOutputPlans().get(0).getSources().get(0);
+ Assert.assertTrue(cast2.getType()==DataType.INTEGER);
+ Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast4() throws Exception {
+ // No additional cast ForEach will be inserted
+ String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+ "b = foreach a generate flatten(nb1) as (year, name), nb2;\n" +
+ "store b into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast5() throws Exception {
+ // cast ForEach will be inserted
+ String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+ "b = foreach a generate flatten(nb1) as (year, name:chararray), nb2 as nb2:chararray;\n" +
+ "store b into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ // flattened "name" field should be bytearray before typecasted to chararray
+ Assert.assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(1).type);
+ LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+ LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+ Assert.assertTrue(generate.getOutputPlans().get(0).getSources().get(0) instanceof ProjectExpression);
+ CastExpression cast = (CastExpression)generate.getOutputPlans().get(1).getSources().get(0);
+ Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+ Assert.assertTrue(generate.getOutputPlans().get(2).getSources().get(0) instanceof ProjectExpression);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast6() throws Exception {
+ // no cast ForEach will be inserted
+ String query = "a = load 'foo' as (nb1:bag{(year,name)}, nb2);\n" +
+ "b = foreach a generate flatten(nb1) as (year, name2), nb2;\n" +
+ "store b into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ assertEquals(loForEach.getSchema().getField(1).alias, "name2");
+ Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testForEachWithCast7() throws Exception {
+ // no cast ForEach will be inserted, since we don't know the size of outputs
+ // in first inner plan
+ String query = "a = load 'foo' as (nb1:bag{}, nb2:bag{});\n" +
+ "b = foreach a generate flatten(nb1), flatten(nb2) as (year, name);\n" +
+ "store b into 'output';";
+
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+
+ LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+ LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+ Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testAsType1() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
+ Data data = Storage.resetData(pig);
+ data.set("input", tuple(0.1), tuple(1.2), tuple(2.3));
+
+ String query =
+ "A = load 'input' USING mock.Storage() as (a1:double);\n"
+ + "B = FOREACH A GENERATE a1 as (a2:int);\n"
+ + "store B into 'out' using mock.Storage;" ;
+
+ Util.registerMultiLineQuery(pig, query);
+ List<Tuple> list = data.get("out");
+ // Without PIG-2315, this failed with (0.1), (1.2), (2.3)
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {"(0)", "(1)", "(2)"});
+ Util.checkQueryOutputsAfterSort(list, expectedRes);
+ }
+
+ @Test
+ // See PIG-2315
+ public void testAsType2() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
+ Data data = Storage.resetData(pig);
+ data.set("input", tuple("a"), tuple("b"), tuple("c"));
+
+ String query =
+ "A = load 'input' USING mock.Storage(); \n"
+ + "A2 = FOREACH A GENERATE 12345 as (a2:chararray); \n"
+ + "B = load 'input' USING mock.Storage(); \n"
+ + "B2 = FOREACH A GENERATE '12345' as (b2:chararray); \n"
+ + "C = union A2, B2;\n"
+ + "D = distinct C;\n"
+ + "store D into 'out' using mock.Storage;" ;
+
+ Util.registerMultiLineQuery(pig, query);
+ List<Tuple> list = data.get("out");
+ // Without PIG-2315, this produced TWO 12345.
+ // One by chararray and another by int.
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {"('12345')"});
+ Util.checkQueryOutputsAfterSort(list, expectedRes);
+ }
}