You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2016/06/14 19:14:39 UTC

svn commit: r1748451 - in /pig/trunk: ./ src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/newplan/logical/visitor/ test/org/apache/pig/test/

Author: knoguchi
Date: Tue Jun 14 19:14:39 2016
New Revision: 1748451

URL: http://svn.apache.org/viewvc?rev=1748451&view=rev
Log:
PIG-2315: Make as clause work in generate (daijy via knoguchi)

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jun 14 19:14:39 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-2315: Make as clause work in generate (daijy via knoguchi)
+
 PIG-4921: Kill running jobs on InterruptedException (rohini)
 
 PIG-4916: Pig on Tez fail to remove temporary HDFS files in some cases (daijy)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Tue Jun 14 19:14:39 2016
@@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR
      // to store uid in mUserDefinedSchema
      private List<LogicalSchema> mUserDefinedSchema = null;
      private List<LogicalSchema> outputPlanSchemas = null;
+     private List<LogicalSchema> expSchemas = null;
      // If LOGenerate generate new uid, cache it here.
      // This happens when expression plan does not have complete schema, however,
      // user give complete schema in ForEach statement in script
@@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR
         
         schema = new LogicalSchema();
         outputPlanSchemas = new ArrayList<LogicalSchema>();
+        expSchemas = new ArrayList<LogicalSchema>();
         
         for(int i=0; i<outputPlans.size(); i++) {
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -97,8 +99,6 @@ public class LOGenerate extends LogicalR
                     // if type is primitive, just add to schema
                     if (fieldSchema!=null)
                         expSchema.addField(fieldSchema);
-                    else
-                        expSchema = null;
                 } else {
                     // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
                     if (fieldSchema.schema==null) {
@@ -137,6 +137,7 @@ public class LOGenerate extends LogicalR
             if (expSchema!=null && expSchema.size()==0)
                 expSchema = null;
             LogicalSchema planSchema = new LogicalSchema();
+            expSchemas.add(expSchema);
             if (mUserDefinedSchemaCopy!=null) {
                 LogicalSchema mergedSchema = new LogicalSchema();
                 // merge with userDefinedSchema
@@ -146,12 +147,6 @@ public class LOGenerate extends LogicalR
                         fs.stampFieldSchema();
                         mergedSchema.addField(new LogicalFieldSchema(fs));
                     }
-                    for (LogicalFieldSchema fs : mergedSchema.getFields()) {
-                        if (fs.type == DataType.NULL){
-                            //this is the use case where a new alias has been specified by user
-                            fs.type = DataType.BYTEARRAY;
-                        }
-                    }
                 } else {
 
                     // Merge uid with the exp field schema
@@ -163,8 +158,12 @@ public class LOGenerate extends LogicalR
                     mergedSchema.mergeUid(expSchema);
 
                 }
-                for (LogicalFieldSchema fs : mergedSchema.getFields())
+                for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+                    if (fs.type==DataType.NULL) {
+                        fs.type = DataType.BYTEARRAY;
+                    }
                     planSchema.addField(fs);
+                }
             } else {
                 // if any plan do not have schema, the whole LOGenerate do not have schema
                 if (expSchema==null) {
@@ -310,4 +309,8 @@ public class LOGenerate extends LogicalR
         super.resetSchema();
         outputPlanSchemas = null;
     }
+
+    public List<LogicalSchema> getExpSchemas() {
+        return expSchemas;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Tue Jun 14 19:14:39 2016
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
 import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
 import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
+import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
 import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
 import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe
         new ColumnAliasConversionVisitor(this).visit();
         new SchemaAliasVisitor(this).visit();
         new ScalarVisitor(this, pigContext, scope).visit();
+        new ForEachUserSchemaVisitor(this).visit();
 
         // ImplicitSplitInsertVisitor has to be called before
         // DuplicateForEachColumnRewriteVisitor.  Detail at pig-1766

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Tue Jun 14 19:14:39 2016
@@ -150,6 +150,39 @@ public class LogicalSchema {
             }
             return true;
         }
+
+        // Check if fs1 is equal to fs2 with regard to type
+        public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
+            if (fs1==null && fs2==null) {
+                return true;
+            }
+            if (fs1==null || fs2==null) {
+                return false;
+            }
+            if (fs1.type!=fs2.type) {
+                return false;
+            }
+            if (DataType.isComplex(fs1.type)) {
+                LogicalSchema s1 = fs1.schema;
+                LogicalSchema s2 = fs2.schema;
+                if (s1==null && s2==null) {
+                    return true;
+                }
+                if (fs1==null || fs2==null) {
+                    return false;
+                }
+                if (s1.size()!=s2.size()) {
+                    return false;
+                }
+                for (int i=0;i<s1.size();i++) {
+                    if (!typeMatch(s1.getField(i), s2.getField(i))) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
         /**
          * Adds the uid from FieldSchema argument to this FieldSchema
          * If the argument is null, it stamps this FieldSchema with uid

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1748451&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Tue Jun 14 19:14:39 2016
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor {
+    public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException {
+        LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+        List<LogicalSchema> mExpSchemas = generate.getExpSchemas();
+        List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema();
+
+        // Skip if no way to figure out schema (usually both expression schema and
+        // user defined schema are null)
+        if (foreach.getSchema()==null) {
+            return;
+        }
+
+        if (mUserDefinedSchemas==null) {
+            return;
+        }
+
+        boolean hasUserDefinedSchema = false;
+        for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) {
+            if (mUserDefinedSchema!=null) {
+                hasUserDefinedSchema = true;
+                break;
+            }
+        }
+
+        if (!hasUserDefinedSchema) {
+            return;
+        }
+
+        if (mExpSchemas.size()!=mUserDefinedSchemas.size()) {
+            throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() +
+                    " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas",
+                    0, generate.getLocation());
+        }
+
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOForEach casterForEach = new LOForEach(plan);
+        casterForEach.setInnerPlan(innerPlan);
+        casterForEach.setAlias(foreach.getAlias());
+
+        List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+        LOGenerate gen = new LOGenerate(innerPlan, exps, null);
+        innerPlan.add(gen);
+
+        int index = 0;
+        boolean needCast = false;
+        for(int i=0;i<mExpSchemas.size();i++) {
+            LogicalSchema mExpSchema = mExpSchemas.get(i);
+            LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i);
+
+            // Use user defined schema to cast, this is the prevailing use case
+            if (mExpSchema==null) {
+                for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+                    if (fs.type==DataType.NULL||fs.type==DataType.BYTEARRAY) {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    } else {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, true, fs);
+                        needCast = true;
+                    }
+                    index++;
+                }
+                continue;
+            }
+
+            // No user defined schema, no need to cast
+            if (mUserDefinedSchema==null) {
+                for (int j=0;j<mExpSchema.size();j++) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    index++;
+                }
+                continue;
+            }
+
+            // Expression has schema, but user also define schema, need cast only
+            // when there is a mismatch
+            if (mExpSchema.size()!=mUserDefinedSchema.size()) {
+                throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() +
+                        " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation());
+            }
+
+            for (int j=0;j<mExpSchema.size();j++) {
+                LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j);
+                LogicalFieldSchema mUserDefinedFieldSchema = mUserDefinedSchema.getField(j);
+                if (mUserDefinedFieldSchema.type==DataType.NULL ||
+                    mUserDefinedFieldSchema.type==DataType.BYTEARRAY ||
+                    LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                } else {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema);
+                    needCast = true;
+                }
+                index++;
+            }
+        }
+
+        gen.setFlattenFlags(new boolean[index]);
+        if (needCast) {
+            // Insert the casterForEach into the plan and patch up the plan.
+            Operator next = plan.getSuccessors(foreach).get(0);
+            plan.insertBetween(foreach, casterForEach, next);
+
+            // Since the explict cast is now inserted after the original foreach,
+            // throwing away the user defined "types" but keeping the user
+            // defined names from the original foreach.
+            // 'generate' (LOGenerate) still holds the reference to this
+            // mUserDefinedSchemas
+            for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) {
+                if( mUserDefinedSchema != null ) {
+                    for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+                        fs.type = DataType.NULL;
+                    }
+                }
+            }
+        }
+    }
+
+    private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen,
+            List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) {
+
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index);
+        innerPlan.add(innerLoad);
+        innerPlan.connect(innerLoad, gen);
+
+        LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+        ProjectExpression prj = new ProjectExpression(exp, index, 0, gen);
+        exp.add(prj);
+
+        if (needCaster) {
+            CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+            exp.add(cast);
+        }
+        exps.add(exp);
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java?rev=1748451&r1=1748450&r2=1748451&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java Tue Jun 14 19:14:39 2016
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
@@ -36,15 +39,22 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.CastExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOStore;
@@ -311,4 +321,192 @@ public class TestPlanGeneration {
         assertNotNull(((PartitionedLoader)loLoad.getLoadFunc()).getPartFilter());
         assertEquals("b", loStore.getAlias());
     }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast1() throws Exception {
+        // A cast ForEach is inserted to take care of the user schema
+        String query = "A = load 'foo' as (a, b:int);\n" +
+                "B = foreach A generate a as a0:chararray, b as b:int;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        // before a0 is typecasted to chararray, it should be bytearray
+        assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(0).type);
+        // type of b should stay as int
+        assertEquals(DataType.INTEGER, loForEach1.getSchema().getField(1).type);
+        assertEquals("B", loForEach2.getAlias());
+        LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        CastExpression cast = (CastExpression)generate.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+        assertEquals(loForEach2.getSchema().getField(0).alias, "a0");
+        Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast2() throws Exception {
+        // No additional cast ForEach will be inserted, but schema should match
+        String query = "A = load 'foo' as (a, b);\n" +
+                "B = foreach A generate (chararray)a as a0:chararray;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        assertEquals(loForEach.getSchema().getField(0).alias, "a0");
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast3() throws Exception {
+        // No additional cast ForEach will be inserted, but schema should match
+        String query = "A = load 'foo' as (a, b);\n" +
+                "B = foreach A generate (chararray)a as a0:int;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        LOGenerate generate1 = (LOGenerate)loForEach1.getInnerPlan().getSinks().get(0);
+        CastExpression cast1 = (CastExpression)generate1.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast1.getType()==DataType.CHARARRAY);
+        //before a0 is typecasted to int, it should be chararray
+        Assert.assertEquals(DataType.CHARARRAY, loForEach1.getSchema().getField(0).type);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        LOGenerate generate2 = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        CastExpression cast2 = (CastExpression)generate2.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast2.getType()==DataType.INTEGER);
+        Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast4() throws Exception {
+        // No additional cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name), nb2;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast5() throws Exception {
+        // cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name:chararray), nb2 as nb2:chararray;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        // flattened "name" field should be bytearray before typecasted to  chararray
+        Assert.assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(1).type);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        Assert.assertTrue(generate.getOutputPlans().get(0).getSources().get(0) instanceof ProjectExpression);
+        CastExpression cast = (CastExpression)generate.getOutputPlans().get(1).getSources().get(0);
+        Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+        Assert.assertTrue(generate.getOutputPlans().get(2).getSources().get(0) instanceof ProjectExpression);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast6() throws Exception {
+        // no cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{(year,name)}, nb2);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name2), nb2;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        assertEquals(loForEach.getSchema().getField(1).alias, "name2");
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast7() throws Exception {
+        // no cast ForEach will be inserted, since we don't know the size of outputs
+        // in first inner plan
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:bag{});\n" +
+                "b = foreach a generate flatten(nb1), flatten(nb2) as (year, name);\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testAsType1() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        Data data = Storage.resetData(pig);
+        data.set("input", tuple(0.1), tuple(1.2), tuple(2.3));
+
+        String query =
+            "A = load 'input' USING mock.Storage() as (a1:double);\n"
+            + "B = FOREACH A GENERATE a1 as (a2:int);\n"
+            + "store B into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> list = data.get("out");
+        // Without PIG-2315, this failed with (0.1), (1.2), (2.3)
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {"(0)", "(1)", "(2)"});
+        Util.checkQueryOutputsAfterSort(list, expectedRes);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testAsType2() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        Data data = Storage.resetData(pig);
+        data.set("input", tuple("a"), tuple("b"), tuple("c"));
+
+        String query =
+            "A = load 'input' USING mock.Storage(); \n"
+            + "A2 = FOREACH A GENERATE 12345 as (a2:chararray); \n"
+            + "B = load 'input' USING mock.Storage(); \n"
+            + "B2 = FOREACH A GENERATE '12345' as (b2:chararray); \n"
+            + "C = union A2, B2;\n"
+            + "D = distinct C;\n"
+            + "store D into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> list = data.get("out");
+        // Without PIG-2315, this produced TWO 12345.
+        // One by chararray and another by int.
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {"('12345')"});
+        Util.checkQueryOutputsAfterSort(list, expectedRes);
+    }
 }