You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/07/03 22:36:16 UTC

svn commit: r1356921 [4/4] - in /pig/trunk: ./ conf/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expr...

Added: pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+@InterfaceAudience.Private
+public class SedesHelper {
+    private static final BinInterSedes pigSerializer = new BinInterSedes();
+    private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    public static void writeBytes(DataOutput out, byte[] buf) throws IOException {
+        int sz = buf.length;
+        if (sz < BinInterSedes.UNSIGNED_BYTE_MAX) {
+            out.writeByte(BinInterSedes.TINYBYTEARRAY);
+            out.writeByte(sz);
+        } else if (sz < BinInterSedes.UNSIGNED_SHORT_MAX) {
+            out.writeByte(BinInterSedes.SMALLBYTEARRAY);
+            out.writeShort(sz);
+        } else {
+            out.writeByte(BinInterSedes.BYTEARRAY);
+            out.writeInt(sz);
+        }
+        out.write(buf);
+    }
+
+    public static byte[] readBytes(DataInput in, byte type) throws IOException {
+       int sz = 0;
+       switch(type) {
+       case(BinInterSedes.TINYBYTEARRAY): sz = in.readUnsignedByte(); break;
+       case(BinInterSedes.SMALLBYTEARRAY): sz = in.readUnsignedShort(); break;
+       case(BinInterSedes.BYTEARRAY): sz = in.readInt(); break;
+       }
+       byte[] buf = new byte[sz];
+       in.readFully(buf);
+       return buf;
+    }
+
+    public static void writeChararray(DataOutput out, String s) throws IOException {
+        // a char can take up to 3 bytes in the modified utf8 encoding
+        // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
+        if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) {
+            out.writeByte(BinInterSedes.SMALLCHARARRAY);
+            out.writeUTF(s);
+        } else {
+            byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
+            int length = utfBytes.length;
+
+            out.writeByte(BinInterSedes.CHARARRAY);
+            out.writeInt(length);
+            out.write(utfBytes);
+        }
+    }
+
+    public static String readChararray(DataInput in, byte type) throws IOException {
+        if (type == BinInterSedes.SMALLCHARARRAY) {
+            return in.readUTF();
+        }
+
+        int size = in.readInt();
+        byte[] buf = new byte[size];
+        in.readFully(buf);
+        return new String(buf, BinInterSedes.UTF8);
+    }
+
+    public static void writeGenericTuple(DataOutput out, Tuple t) throws IOException {
+        int sz = t.size();
+        if (sz < BinInterSedes.UNSIGNED_BYTE_MAX) {
+            out.writeByte(BinInterSedes.TINYTUPLE);
+            out.writeByte(sz);
+        } else if (sz < BinInterSedes.UNSIGNED_SHORT_MAX) {
+            out.writeByte(BinInterSedes.SMALLTUPLE);
+            out.writeShort(sz);
+        } else {
+            out.writeByte(BinInterSedes.TUPLE);
+            out.writeInt(sz);
+        }
+
+        for (int i = 0; i < sz; i++) {
+            pigSerializer.writeDatum(out, t.get(i));
+        }
+    }
+
+    public static Tuple readGenericTuple(DataInput in, byte type) throws IOException {
+        int sz = pigSerializer.getTupleSize(in, type);
+
+        Tuple t = mTupleFactory.newTuple(sz);
+        for (int i = 0; i < sz; i++) {
+            t.set(i, pigSerializer.readDatum(in));
+        }
+        return t;
+    }
+
+    public static void writeBooleanArray(DataOutput out, boolean[] v, boolean extra) throws IOException {
+        int len = v.length + 1;
+        for (int chunk = 0; chunk < len; chunk += 8) {
+            byte encoding = 0;
+            for (int i = chunk; i < len && i < chunk + 8; i++) {
+                encoding <<= 1;
+                if (i == v.length) {
+                    encoding += extra ? 1 : 0; //v[len] is the extra piece
+                } else {
+                    encoding += v[i] ? 1 : 0;
+                }
+            }
+            out.writeByte(encoding);
+       }
+    }
+
+    public static void writeBooleanArray(DataOutput out, boolean[] v) throws IOException {
+        for (int chunk = 0; chunk < v.length; chunk += 8) {
+            byte encoding = 0;
+            for (int i = chunk; i < v.length && i < chunk + 8; i++) {
+                encoding <<= 1;
+                encoding += v[i] ? 1 : 0;
+            }
+            out.writeByte(encoding);
+       }
+    }
+
+    public static boolean[] readBooleanArray(DataInput in, int size) throws IOException {
+        boolean[] v = new boolean[size];
+        for (int chunk = 0; chunk < size; chunk += 8) {
+            byte decoding = in.readByte();
+            for (int i = chunk + Math.min(7, size - chunk - 1); i >= 0; i--) {
+               v[i] = decoding % 2 == 1;
+               decoding >>= 1;
+            }
+        }
+        return v;
+    }
+
+    /**
+     * <p>Encodes signed and unsigned values using a common variable-length
+     * scheme, found for example in
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+     * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
+     * but will use slightly more bytes to encode large values.</p>
+     *
+     * <p>Signed values are further encoded using so-called zig-zag encoding
+     * in order to make them "compatible" with variable-length encoding.</p>
+     *
+     * <p>This is taken from mahout-core, and is included to avoid having to pull
+     * in the entirety of Mahout.</p>
+     */
+    public static class Varint {
+
+        private Varint() {
+        }
+
+        /**
+         * Encodes a value using the variable-length encoding from
+         * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+         * Google Protocol Buffers</a>. It uses zig-zag encoding to efficiently
+         * encode signed values. If values are known to be nonnegative,
+         * {@link #writeUnsignedVarLong(long, DataOutput)} should be used.
+         *
+         * @param value value to encode
+         * @param out to write bytes to
+         * @throws IOException if {@link DataOutput} throws {@link IOException}
+         */
+        public static void writeSignedVarLong(long value, DataOutput out) throws IOException {
+            // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+            writeUnsignedVarLong((value << 1) ^ (value >> 63), out);
+        }
+
+        /**
+         * Encodes a value using the variable-length encoding from
+         * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+         * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be negative.
+         * If values can be negative, use {@link #writeSignedVarLong(long, DataOutput)}
+         * instead. This method treats negative input as like a large unsigned value.
+         *
+         * @param value value to encode
+         * @param out to write bytes to
+         * @throws IOException if {@link DataOutput} throws {@link IOException}
+         */
+        public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException {
+            while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+                out.writeByte(((int) value & 0x7F) | 0x80);
+                value >>>= 7;
+            }
+            out.writeByte((int) value & 0x7F);
+        }
+
+        /**
+         * @see #writeSignedVarLong(long, DataOutput)
+         */
+        public static void writeSignedVarInt(int value, DataOutput out) throws IOException {
+            // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+            writeUnsignedVarInt((value << 1) ^ (value >> 31), out);
+        }
+
+        /**
+         * @see #writeUnsignedVarLong(long, DataOutput)
+         */
+        public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException {
+            while ((value & 0xFFFFFF80) != 0L) {
+                out.writeByte((value & 0x7F) | 0x80);
+                value >>>= 7;
+            }
+            out.writeByte(value & 0x7F);
+        }
+
+        /**
+         * @param in to read bytes from
+         * @return decode value
+         * @throws IOException if {@link DataInput} throws {@link IOException}
+         * @throws IllegalArgumentException if variable-length value does not terminate
+         *  after 9 bytes have been read
+         * @see #writeSignedVarLong(long, DataOutput)
+         */
+        public static long readSignedVarLong(DataInput in) throws IOException {
+            long raw = readUnsignedVarLong(in);
+            // This undoes the trick in writeSignedVarLong()
+            long temp = (((raw << 63) >> 63) ^ raw) >> 1;
+            // This extra step lets us deal with the largest signed values by treating
+            // negative results from read unsigned methods as like unsigned values
+            // Must re-flip the top bit if the original read value had it set.
+            return temp ^ (raw & (1L << 63));
+        }
+
+        /**
+         * @param in to read bytes from
+         * @return decode value
+         * @throws IOException if {@link DataInput} throws {@link IOException}
+         * @throws IllegalArgumentException if variable-length value does not terminate
+         *  after 9 bytes have been read
+         * @see #writeUnsignedVarLong(long, DataOutput)
+         */
+        public static long readUnsignedVarLong(DataInput in) throws IOException {
+            long value = 0L;
+            int i = 0;
+            long b;
+            while (((b = in.readByte()) & 0x80L) != 0) {
+                value |= (b & 0x7F) << i;
+                i += 7;
+                if (i > 63) {
+                    throw new RuntimeException("Variable length quantity is too long");
+                }
+            }
+            return value | (b << i);
+        }
+
+        /**
+         * @throws IllegalArgumentException if variable-length value does not terminate
+         *  after 5 bytes have been read
+         * @throws IOException if {@link DataInput} throws {@link IOException}
+         * @see #readSignedVarLong(DataInput)
+         */
+        public static int readSignedVarInt(DataInput in) throws IOException {
+            int raw = readUnsignedVarInt(in);
+            // This undoes the trick in writeSignedVarInt()
+            int temp = (((raw << 31) >> 31) ^ raw) >> 1;
+            // This extra step lets us deal with the largest signed values by treating
+            // negative results from read unsigned methods as like unsigned values.
+            // Must re-flip the top bit if the original read value had it set.
+            return temp ^ (raw & (1 << 31));
+        }
+
+        /**
+         * @throws IllegalArgumentException if variable-length value does not terminate
+         *  after 5 bytes have been read
+         * @throws IOException if {@link DataInput} throws {@link IOException}
+         * @see #readUnsignedVarLong(DataInput)
+         */
+        public static int readUnsignedVarInt(DataInput in) throws IOException {
+            int value = 0;
+            int i = 0;
+            int b;
+            while (((b = in.readByte()) & 0x80) != 0) {
+                value |= (b & 0x7F) << i;
+                i += 7;
+                if (i > 35) {
+                    throw new RuntimeException("Variable length quantity is too long");
+                }
+            }
+            return value | (b << i);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+public class StructuresHelper {
+    private StructuresHelper() {
+    }
+
+    /**
+     * This encapsulates a Schema and allows it to be used in such a way that
+     * any aliases are ignored in equality.
+     */
+    public static class SchemaKey {
+        private Schema s;
+
+        public SchemaKey(Schema s) {
+            this.s = s;
+        }
+
+        private static int[] primeList = { 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37,
+                                           41, 43, 47, 53, 59, 61, 67, 71, 73, 79,
+                                           83, 89, 97, 101, 103, 107, 109, 1133};
+
+        /**
+         * The hashcode logic is taken from the Schema class, including how fields
+         * are handled. The difference is that aliases are ignored.
+         */
+        @Override
+        public int hashCode() {
+            return hashCode(s);
+        }
+
+        public static int hashCode(Schema s) {
+            if (s == null) {
+                return 0;
+            }
+            int idx = 0 ;
+            int hashCode = 0 ;
+            for(FieldSchema fs : s.getFields()) {
+                hashCode += hashCode(fs) * (primeList[idx % primeList.length]) ;
+                idx++ ;
+            }
+            return hashCode ;
+        }
+
+        private static int hashCode(FieldSchema fs) {
+            return (fs.type * 17) + ( (fs.schema == null? 0 : hashCode(fs.schema)) * 23 );
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof SchemaKey)) {
+                return false;
+            }
+            Schema other = ((SchemaKey)o).get();
+            return (s == null && other == null) || Schema.equals(s, other, false, true);
+        }
+
+        public Schema get() {
+            return s;
+        }
+
+        public String toString() {
+            return s.toString();
+        }
+    }
+
+    /**
+     * This is a helper class which makes it easy to have pairs of values,
+     * and to use them as keys and values in Maps.
+     */
+    public static class Pair<T1, T2> {
+        private final T1 t1;
+        private final T2 t2;
+
+        public Pair(T1 t1, T2 t2) {
+            this.t1 = t1;
+            this.t2 = t2;
+        }
+
+        public T1 getFirst() {
+            return t1;
+        }
+
+        public T2 getSecond() {
+            return t2;
+        }
+
+        public static <A,B> Pair<A,B> make(A t1, B t2) {
+            return new Pair<A,B>(t1, t2);
+        }
+
+        @Override
+        public int hashCode() {
+            return (t1 == null ? 0 : t1.hashCode()) + (t2 == null ? 0 : 31 * t2.hashCode());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Pair<?,?>)) {
+                return false;
+            }
+            Pair<?,?> pr = (Pair<?,?>)o;
+            if (t1 == null) {
+                return pr.getFirst() == null;
+            }
+            if (!t1.equals(pr.getFirst())) {
+                return false;
+            }
+            if (t2 == null) {
+                return pr.getSecond() == null;
+            }
+            return t2.equals(pr.getSecond());
+        }
+
+        @Override
+        public String toString() {
+            return new StringBuilder()
+                    .append("[")
+                    .append(t1)
+                    .append(",")
+                    .append(t2)
+                    .append("]")
+                    .toString();
+        }
+    }
+
+    public static class Triple<T1, T2, T3> {
+        private final T1 t1;
+        private final T2 t2;
+        private final T3 t3;
+
+        public Triple(T1 t1, T2 t2, T3 t3) {
+            this.t1 = t1;
+            this.t2 = t2;
+            this.t3 = t3;
+        }
+
+        public T1 getFirst() {
+            return t1;
+        }
+
+        public T2 getSecond() {
+            return t2;
+        }
+
+        public T3 getThird() {
+            return t3;
+        }
+
+        public static <A,B,C> Triple<A,B,C> make(A t1, B t2, C t3) {
+            return new Triple<A,B,C>(t1, t2, t3);
+        }
+
+        @Override
+        public int hashCode() {
+            return (t1 == null ? 0 : t1.hashCode())
+                 + (t2 == null ? 0 : 31 * t2.hashCode())
+                 + (t3 == null ? 0 : 527 * t3.hashCode());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Triple<?,?,?>)) {
+                return false;
+            }
+            Triple<?,?,?> tr = (Triple<?,?,?>)o;
+            if (t1 == null) {
+                return tr.getFirst() == null;
+            }
+            if (!t1.equals(tr.getFirst())) {
+                return false;
+            }
+            if (t2 == null) {
+                return tr.getSecond() == null;
+            }
+            if (!t2.equals(tr.getSecond())) {
+                return false;
+            }
+            if (t3 == null) {
+                return tr.getThird() ==null;
+            }
+            if (!t3.equals(tr.getThird())) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return new StringBuilder()
+                    .append("[")
+                    .append(t1)
+                    .append(",")
+                    .append(t2)
+                    .append(",")
+                    .append(t3)
+                    .append("]")
+                    .toString();
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul  3 20:36:09 2012
@@ -242,9 +242,8 @@ public class PigContext implements Seria
      * calls: addScriptFile(path, new File(path)), ensuring that a given path is
      * added to the jar at most once.
      * @param path
-     * @throws MalformedURLException
      */
-    public void addScriptFile(String path) throws MalformedURLException {
+    public void addScriptFile(String path) {
         if (path != null) {
             aliasedScriptFiles.put(path.replaceFirst("^/", ""), new File(path));
         }

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Tue Jul  3 20:36:09 2012
@@ -100,9 +100,7 @@ public class InterRecordReader extends R
           }
           if(b == -1) return false;
           b = in.read();
-          if(b != BinInterSedes.TINYTUPLE && 
-                  b != BinInterSedes.SMALLTUPLE &&
-                  b != BinInterSedes.TUPLE &&
+          if(!BinInterSedes.isTupleByte((byte) b) &&
                   b != -1) {
               continue;
           }

Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Tue Jul  3 20:36:09 2012
@@ -17,6 +17,10 @@
  */
 package org.apache.pig.impl.io;
 
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
@@ -26,12 +30,9 @@ import org.apache.pig.data.TupleFactory;
 public class NullableTuple extends PigNullableWritable {
 
     private TupleFactory mFactory = null;
+    private static final BinInterSedes bis = new BinInterSedes();
 
     public NullableTuple() {
-        if (mFactory == null) {
-            mFactory = TupleFactory.getInstance();
-        }
-        mValue = mFactory.newTuple();
     }
 
     /**
@@ -44,4 +45,15 @@ public class NullableTuple extends PigNu
     public Object getValueAsPigType() {
         return isNull() ? null : (Tuple)mValue;
     }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        boolean nullness = in.readBoolean();
+        setNull(nullness);
+        if (!nullness) {
+            mValue = bis.readTuple(in);
+        }
+        setIndex(in.readByte());
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Jul  3 20:36:09 2012
@@ -69,7 +69,6 @@ import org.apache.pig.newplan.Dependency
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -537,6 +536,7 @@ public class ExpToPhyTranslationVisitor 
             Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator();
             ((POUserFunc)p).setReferencedOperator( logToPhyMap.get( refOp ) );
         }
+
     }
     
     @Override

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Tue Jul  3 20:36:09 2012
@@ -25,9 +25,10 @@ import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.ResourceSchema;
 import org.apache.pig.builtin.Nondeterministic;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFrontend;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -193,12 +194,18 @@ public class UserFuncExpression extends 
         
         ef.setUDFContextSignature(signature);
         Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass());
-        if(Util.translateSchema(inputSchema)!=null)
-    		props.put("pig.evalfunc.inputschema."+signature, Util.translateSchema(inputSchema));
+        Schema translatedInputSchema = Util.translateSchema(inputSchema);
+        if(translatedInputSchema != null) {
+    		props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
+        }
         // Store inputSchema into the UDF context
-        ef.setInputSchema(Util.translateSchema(inputSchema));
+        ef.setInputSchema(translatedInputSchema);
+;
+        Schema udfSchema = ef.outputSchema(translatedInputSchema);
         
-        Schema udfSchema = ef.outputSchema(Util.translateSchema(inputSchema));
+        //TODO appendability should come from a setting
+        SchemaTupleFrontend.registerToGenerateIfPossible(translatedInputSchema, false, GenContext.UDF);
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, false, GenContext.UDF);
 
         if (udfSchema != null) {
             Schema.FieldSchema fs;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Jul  3 20:36:09 2012
@@ -25,8 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -57,6 +60,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFrontend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -64,6 +69,8 @@ import org.apache.pig.impl.builtin.GFCro
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -84,7 +91,10 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.parser.SourceLocation;
 
+import com.google.common.collect.Lists;
+
 public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
+    private static final Log LOG = LogFactory.getLog(LogToPhyTranslationVisitor.class);
     
     public LogToPhyTranslationVisitor(OperatorPlan plan) throws FrontendException {
         super(plan, new DependencyOrderWalker(plan));
@@ -190,7 +200,7 @@ public class LogToPhyTranslationVisitor 
         String scope = DEFAULT_SCOPE;
 //        System.err.println("Entering Filter");
         POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), filter.getRequestedParallelisam());
+                .getNextNodeId(scope)), filter.getRequestedParallelism());
         poFilter.addOriginalLocation(filter.getAlias(), filter.getLocation());
         poFilter.setResultType(DataType.BAG);
         currentPlan.add(poFilter);
@@ -258,14 +268,14 @@ public class LogToPhyTranslationVisitor 
         POSort poSort;
         if (sort.getUserFunc() == null) {
             poSort = new POSort(new OperatorKey(scope, nodeGen
-                    .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+                    .getNextNodeId(scope)), sort.getRequestedParallelism(), null,
                     sortPlans, sort.getAscendingCols(), null);
         } else {
             POUserComparisonFunc comparator = new POUserComparisonFunc(new OperatorKey(
                     scope, nodeGen.getNextNodeId(scope)), sort
-                    .getRequestedParallelisam(), null, sort.getUserFunc());
+                    .getRequestedParallelism(), null, sort.getUserFunc());
             poSort = new POSort(new OperatorKey(scope, nodeGen
-                    .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+                    .getNextNodeId(scope)), sort.getRequestedParallelism(), null,
                     sortPlans, sort.getAscendingCols(), comparator);
         }
         poSort.addOriginalLocation(sort.getAlias(), sort.getLocation());
@@ -300,7 +310,7 @@ public class LogToPhyTranslationVisitor 
         String scope = DEFAULT_SCOPE;
         List<Operator> inputs = cross.getPlan().getPredecessors(cross);
                 if (cross.isNested()) {
-            POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam());
+            POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
             physOp.addOriginalLocation(physOp.getAlias(), physOp.getOriginalLocations());
             currentPlan.add(physOp);
             physOp.setResultType(DataType.BAG);
@@ -318,10 +328,10 @@ public class LogToPhyTranslationVisitor 
         } else {
             POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                     scope, nodeGen.getNextNodeId(scope)), cross
-                    .getRequestedParallelisam());
+                    .getRequestedParallelism());
             poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
             POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
-                    .getNextNodeId(scope)), cross.getRequestedParallelisam());
+                    .getNextNodeId(scope)), cross.getRequestedParallelism());
             poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
             currentPlan.add(poGlobal);
             currentPlan.add(poPackage);
@@ -334,12 +344,12 @@ public class LogToPhyTranslationVisitor 
                 
                 for (Operator op : inputs) {
                     PhysicalPlan fep1 = new PhysicalPlan();
-                    ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                    ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                     ce1.setValue(inputs.size());
                     ce1.setResultType(DataType.INTEGER);
                     fep1.add(ce1);
                     
-                    ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                    ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                     ce2.setValue(count);
                     ce2.setResultType(DataType.INTEGER);
                     fep1.add(ce2);
@@ -349,7 +359,7 @@ public class LogToPhyTranslationVisitor 
                     ce1.setValue(ce1val);
                     ce1.setResultType(DataType.TUPLE);*/
                     
-                    POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
+                    POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
                     gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     gfc.setResultType(DataType.BAG);
                     fep1.addAsLeaf(gfc);
@@ -359,7 +369,7 @@ public class LogToPhyTranslationVisitor 
                     fep1.connect(ce2, gfc);*/
                     
                     PhysicalPlan fep2 = new PhysicalPlan();
-                    POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam());
+                    POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
                     feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     feproj.setResultType(DataType.TUPLE);
                     feproj.setStar(true);
@@ -367,19 +377,19 @@ public class LogToPhyTranslationVisitor 
                     fep2.add(feproj);
                     List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
                     
-                    POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+                    POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
                     fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     currentPlan.add(fe);
                     currentPlan.connect(logToPhyMap.get(op), fe);
                     
                     POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
                             scope, nodeGen.getNextNodeId(scope)), cross
-                            .getRequestedParallelisam());
+                            .getRequestedParallelism());
                     physOp.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
                     for(int i=0;i<inputs.size();i++){
                         PhysicalPlan lrp1 = new PhysicalPlan();
-                        POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+                        POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
                         lrproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
                         lrproj1.setOverloaded(false);
                         lrproj1.setResultType(DataType.INTEGER);
@@ -420,7 +430,7 @@ public class LogToPhyTranslationVisitor 
             List<Boolean> flattenLst = new ArrayList<Boolean>();
             for(int i=1;i<=count;i++){
                 PhysicalPlan fep1 = new PhysicalPlan();
-                POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+                POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
                 feproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
                 feproj1.setResultType(DataType.BAG);
                 feproj1.setOverloaded(false);
@@ -429,7 +439,7 @@ public class LogToPhyTranslationVisitor 
                 flattenLst.add(true);
             }
             
-            POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+            POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
             fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
             currentPlan.add(fe);
             try{
@@ -596,7 +606,7 @@ public class LogToPhyTranslationVisitor 
             flattenList.add(fl);
         }
         POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList);
+                .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList);
         poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
         poFE.setResultType(DataType.BAG);
         logToPhyMap.put(foreach, poFE);
@@ -821,7 +831,7 @@ public class LogToPhyTranslationVisitor 
         }
         
         POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
-                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelisam());
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelism());
         return poCogrp;
     }
     
@@ -881,7 +891,7 @@ public class LogToPhyTranslationVisitor 
         boolean[] innerFlags = loj.getInnerFlags();
         String alias = loj.getAlias();
         SourceLocation location = loj.getLocation();
-        int parallel = loj.getRequestedParallelisam();
+        int parallel = loj.getRequestedParallelism();
         
         for (int i=0;i<inputs.size();i++) {
             Operator op = inputs.get(i);
@@ -952,6 +962,27 @@ public class LogToPhyTranslationVisitor 
             logToPhyMap.put(loj, skj);
         }
         else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
+            List<Schema> inputSchemas = Lists.newArrayListWithCapacity(inputs.size());
+            List<Schema> keySchemas = Lists.newArrayListWithCapacity(inputs.size());
+
+            outer: for (int i = 0; i < inputs.size(); i++) {
+                Schema toGen = Schema.getPigSchema(new ResourceSchema(((LogicalRelationalOperator)inputs.get(i)).getSchema()));
+                // This registers the value piece
+                SchemaTupleFrontend.registerToGenerateIfPossible(toGen, false, GenContext.FR_JOIN);
+                inputSchemas.add(toGen);
+
+                Schema keyToGen = new Schema();
+                for (Byte byt : keyTypes.get(i)) {
+                    // We cannot generate any nested code because that information is thrown away
+                    if (byt == null || DataType.isComplex(byt.byteValue())) {
+                        continue outer;
+                    }
+                    keyToGen.add(new FieldSchema(null, byt));
+                }
+
+                SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
+                keySchemas.add(keyToGen);
+            }
             
             int fragment = 0;
             POFRJoin pfrj;
@@ -990,8 +1021,17 @@ public class LogToPhyTranslationVisitor 
                     }
                 }
                 
-                pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),parallel,
-                                            inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple);
+                pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
+                                        parallel,
+                                        inp,
+                                        ppLists,
+                                        keyTypes,
+                                        null,
+                                        fragment,
+                                        isLeftOuter,
+                                        nullTuple,
+                                        inputSchemas,
+                                        keySchemas);
                 pfrj.addOriginalLocation(alias, location);
             } catch (ExecException e1) {
                 int errCode = 2058;
@@ -1017,12 +1057,46 @@ public class LogToPhyTranslationVisitor 
             boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ; 
 
             if(usePOMergeJoin){
+                // We register the merge join schema information for code generation
+                LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+                Schema leftSchema = null;
+                if (logicalSchema != null) {
+                    leftSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+                }
+                logicalSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+                Schema rightSchema = null;
+                if (logicalSchema != null) {
+                    rightSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+                }
+                logicalSchema = loj.getSchema();
+                Schema mergedSchema = null;
+                if (logicalSchema != null) {
+                    mergedSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+                }
+
+                if (leftSchema != null) {
+                    SchemaTupleFrontend.registerToGenerateIfPossible(leftSchema, false, GenContext.MERGE_JOIN);
+                }
+                if (rightSchema != null) {
+                    SchemaTupleFrontend.registerToGenerateIfPossible(rightSchema, false, GenContext.MERGE_JOIN);
+                }
+                if (mergedSchema != null) {
+                    SchemaTupleFrontend.registerToGenerateIfPossible(mergedSchema, false, GenContext.MERGE_JOIN);
+                }
+
                 // inner join on two sorted inputs. We have less restrictive 
                 // implementation here in a form of POMergeJoin which doesn't 
                 // require loaders to implement collectable interface.
                 try {
                     smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
-                                            parallel,inp,joinPlans,keyTypes, loj.getJoinType());
+                                            parallel,
+                                            inp,
+                                            joinPlans,
+                                            keyTypes,
+                                            loj.getJoinType(),
+                                            leftSchema,
+                                            rightSchema,
+                                            mergedSchema);
                 }
                 catch (PlanException e) {
                     int errCode = 2042;
@@ -1083,11 +1157,11 @@ public class LogToPhyTranslationVisitor 
             boolean[] innerFlags, MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
-                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
         poGlobal.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
         poGlobal.setCustomPartitioner(customPartitioner);
         POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
-                .getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+                .getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
         poPackage.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
         currentPlan.add(poGlobal);
         currentPlan.add(poPackage);
@@ -1107,7 +1181,7 @@ public class LogToPhyTranslationVisitor 
             Operator op = inputs.get(i);
             List<LogicalExpressionPlan> plans = innerPlans.get(i);
             POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
-                    DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+                    DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
             physOp.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
             List<PhysicalPlan> exprPlans = translateExpressionPlans(relationalOp, plans);
             try {
@@ -1198,7 +1272,7 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LOUnion loUnion) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam());
+        POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelism());
         physOp.addOriginalLocation(loUnion.getAlias(), loUnion.getLocation());
         currentPlan.add(physOp);
         physOp.setResultType(DataType.BAG);
@@ -1220,7 +1294,7 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelisam());
+        PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
         physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
         currentPlan.add(physOp);
         physOp.setResultType(DataType.BAG);
@@ -1241,7 +1315,7 @@ public class LogToPhyTranslationVisitor 
     public void visit(LOLimit loLimit) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         POLimit poLimit = new POLimit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
-                loLimit.getRequestedParallelisam());
+                loLimit.getRequestedParallelism());
         poLimit.setLimit(loLimit.getLimit());
         poLimit.addOriginalLocation(loLimit.getAlias(), loLimit.getLocation());
         poLimit.setResultType(DataType.BAG);
@@ -1279,7 +1353,7 @@ public class LogToPhyTranslationVisitor 
     public void visit(LOSplit loSplit) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), loSplit.getRequestedParallelisam());
+                .getNextNodeId(scope)), loSplit.getRequestedParallelism());
         physOp.addOriginalLocation(loSplit.getAlias(), loSplit.getLocation());
         FileSpec splStrFile;
         try {
@@ -1332,7 +1406,7 @@ public class LogToPhyTranslationVisitor 
         String scope = DEFAULT_SCOPE;
 //        System.err.println("Entering Filter");
         POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam());
+                .getNextNodeId(scope)), loSplitOutput.getRequestedParallelism());
         poFilter.addOriginalLocation(loSplitOutput.getAlias(), loSplitOutput.getLocation());
         poFilter.setResultType(DataType.BAG);
         currentPlan.add(poFilter);
@@ -1404,34 +1478,6 @@ public class LogToPhyTranslationVisitor 
         
     }
 
-    private boolean validateMergeJoin(LOJoin loj) throws FrontendException{
-        
-        List<Operator> preds = plan.getPredecessors(loj);
-
-        int errCode = 1101;
-        String errMsg = "Merge Join must have exactly two inputs.";
-        if(preds.size() != 2)
-            throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode);
-        
-        return mergeJoinValidator(preds,loj.getPlan());
-    }
-    
-    private boolean mergeJoinValidator(List<Operator> preds,OperatorPlan lp) throws FrontendException {
-        
-        int errCode = 1103;
-        String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : ";
-        if(preds != null && !preds.isEmpty()){
-            for(Operator lo : preds){
-                if (!(lo instanceof LOFilter || lo instanceof LOLoad || lo instanceof LOForEach))
-                    throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
-                // All is good at this level. Visit predecessors now.
-                mergeJoinValidator(lp.getPredecessors(lo),lp);
-            }
-        }
-        // We visited everything and all is good.
-        return true;
-    }
-    
     private void translateSoftLinks(Operator op) throws FrontendException {
         List<Operator> preds = op.getPlan().getSoftLinkPredecessors(op);
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java Tue Jul  3 20:36:09 2012
@@ -103,7 +103,7 @@ abstract public class LogicalRelationalO
      * Get the requestedParallelism for this operator.
      * @return requestedParallelsim
      */
-    public int getRequestedParallelisam() {
+    public int getRequestedParallelism() {
         return requestedParallelism;
     } 
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java Tue Jul  3 20:36:09 2012
@@ -88,7 +88,7 @@ public class GroupByConstParallelSetter 
                 Operator op = iter.next();
                 if (op instanceof LOCogroup) {
                     LOCogroup group = (LOCogroup)op;
-                    if(group.getRequestedParallelisam() > 1){
+                    if(group.getRequestedParallelism() > 1){
                         log.warn("Resetting parallism to 1 for the group/cogroup " +
                                 group.getAlias() +
                         " as the group by expressions returns a constant");

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Tue Jul  3 20:36:09 2012
@@ -198,7 +198,7 @@ public class MergeForEach extends Rule {
             LogicalPlan newForEachInnerPlan = new LogicalPlan();
             newForEach.setInnerPlan(newForEachInnerPlan);
             newForEach.setAlias(foreach2.getAlias());
-            newForEach.setRequestedParallelism(foreach1.getRequestedParallelisam());
+            newForEach.setRequestedParallelism(foreach1.getRequestedParallelism());
             List<LogicalExpressionPlan> newExpList = new ArrayList<LogicalExpressionPlan>();
             LOGenerate newGen = new LOGenerate(newForEachInnerPlan, newExpList, gen2.getFlattenFlags());
             newGen.setUserDefinedSchema(gen2.getUserDefinedSchema());

Added: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (added)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.InterRecordReader;
+import org.apache.pig.impl.io.InterRecordWriter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchemaTuple {
+    private Properties props;
+    private Configuration conf;
+    private PigContext pigContext;
+    private static final BinInterSedes bis = new BinInterSedes();
+
+    @Before
+    public void perTestInitialize() {
+        SchemaTupleFrontend.reset();
+        SchemaTupleBackend.reset();
+
+        props = new Properties();
+        props.setProperty(SchemaTupleBackend.SHOULD_GENERATE_KEY, "true");
+
+        conf = ConfigurationUtil.toConfiguration(props);
+        pigContext = new PigContext(ExecType.LOCAL, props);
+    }
+
+    @Test
+    public void testCompileAndResolve() throws Exception {
+        //frontend
+        Schema udfSchema = Utils.getSchemaFromString("a:int");
+        boolean isAppendable = false;
+        GenContext context = GenContext.UDF;
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("a:long");
+        isAppendable = true;
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
+        isAppendable = false;
+        context = GenContext.LOAD;
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+                +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+                +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double))");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        udfSchema = Utils.getSchemaFromString("int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        isAppendable = true;
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        isAppendable = false;
+        udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        // this compiles and "ships"
+        SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
+
+        //backend
+        SchemaTupleBackend.initialize(conf, ExecType.LOCAL);
+
+        udfSchema = Utils.getSchemaFromString("a:int");
+        isAppendable = false;
+        context = GenContext.UDF;
+        SchemaTupleFactory tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        context = GenContext.MERGE_JOIN;
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        assertNull(tf);
+
+        udfSchema = Utils.getSchemaFromString("a:long");
+        context = GenContext.UDF;
+        isAppendable = true;
+
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        isAppendable = false;
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        assertNull(tf);
+
+        udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
+        isAppendable = false;
+        context = GenContext.LOAD;
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        udfSchema = Utils.getSchemaFromString("(a:chararray)");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        assertNull(tf);
+
+        udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        udfSchema = Utils.getSchemaFromString("a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+                +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+                +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double))");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+                + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        udfSchema = Utils.getSchemaFromString("int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))"
+                +"int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double,"
+                +"(int, long, chararray, boolean, bytearray, float, double))");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        isAppendable = true;
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+
+        isAppendable = false;
+        udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
+    }
+
+    private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
+        assertNotNull(tf);
+        if (isAppendable) {
+            assertTrue(tf.newTuple() instanceof AppendableSchemaTuple);
+        }
+
+        testNotAppendable(tf, udfSchema);
+        if (isAppendable) {
+            testAppendable(tf, udfSchema);
+        }
+    }
+
+    private void testAppendable(SchemaTupleFactory tf, Schema udfSchema) {
+        SchemaTuple<?> st = tf.newTuple();
+
+        st.append("woah");
+        assertEquals(udfSchema.size() + 1, st.size());
+
+    }
+
+    private void testNotAppendable(SchemaTupleFactory tf, Schema udfSchema) throws Exception {
+        SchemaTuple<?> st = tf.newTuple();
+        Schema.equals(udfSchema, st.getSchema(), false, true);
+
+        assertEquals(udfSchema.size(), st.size());
+
+        shouldAllBeNull(tf);
+
+        copyThenCompare(tf);
+
+        testSerDe(tf);
+        testInterStorageSerDe(tf);
+    }
+
+    private void copyThenCompare(SchemaTupleFactory tf) throws ExecException {
+        SchemaTuple<?> st = tf.newTuple();
+        SchemaTuple<?> st2 = tf.newTuple();
+        fillWithData(st);
+        st2.set(st);
+        assertTrue(st.equals(st2));
+        assertEquals(st.compareTo(st2), 0);
+        st.set(0, null);
+        assertFalse(st.equals(st2));
+        assertEquals(st.compareTo(st2), -1);
+        assertTrue(st.isNull(0));
+        st2.set(0, null);
+        assertTrue(st.equals(st2));
+        assertEquals(st.compareTo(st2), 0);
+    }
+
+    /**
+     * This ensures that a fresh Tuple out of a TupleFactory
+     * will be full of null fields.
+     * @param   tf a TupleFactory
+     * @throws  ExecException
+     */
+    private void shouldAllBeNull(SchemaTupleFactory tf) throws ExecException {
+        Tuple t = tf.newTuple();
+        for (Object o : t) {
+            assertNull(o);
+        }
+        for (int i = 0; i < t.size(); i++) {
+            assertNull(t.get(i));
+            assertTrue(t.isNull(i));
+        }
+    }
+
+    private void fillWithData(SchemaTuple<?> st) throws ExecException {
+        Schema udfSchema = st.getSchema();
+        int pos = 0;
+        for (FieldSchema fs : udfSchema.getFields()) {
+            Object val;
+            if (fs.type == DataType.TUPLE) {
+                val = SchemaTupleFactory
+                            .getInstance(fs.schema, false, GenContext.FORCE_LOAD)
+                            .newTuple();
+                fillWithData((SchemaTuple<?>)val);
+            } else {
+                val = randData(fs);
+            }
+            st.set(pos++, val);
+        }
+    }
+
+    private Random r = new Random(100L);
+
+    private Object randData(FieldSchema fs) throws ExecException {
+        switch (fs.type) {
+        case DataType.BOOLEAN: return r.nextBoolean();
+        case DataType.BYTEARRAY: return new DataByteArray(new BigInteger(130, r).toByteArray());
+        case DataType.CHARARRAY: return new BigInteger(130, r).toString(32);
+        case DataType.INTEGER: return r.nextInt();
+        case DataType.LONG: return r.nextLong();
+        case DataType.FLOAT: return r.nextFloat();
+        case DataType.DOUBLE: return r.nextDouble();
+        case DataType.BAG:
+            DataBag db = BagFactory.getInstance().newDefaultBag();
+            int sz = r.nextInt(100);
+            for (int i = 0; i < sz; i++) {
+                int tSz = r.nextInt(10);
+                Tuple t = TupleFactory.getInstance().newTuple(tSz);
+                for (int j = 0; j < tSz; j++) {
+                    t.set(j, r.nextInt());
+                }
+                db.add(t);
+            }
+            return db;
+        default: throw new RuntimeException("Cannot generate data for given FieldSchema: " + fs);
+        }
+    }
+
+    public void testTypeAwareGetSetting(TupleFactory tf) throws ExecException {
+        SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+        checkNullGetThrowsError(st);
+    }
+
+    private void checkNullGetThrowsError(SchemaTuple<?> st) throws ExecException {
+        Schema schema = st.getSchema();
+        int i = 0;
+        for (Schema.FieldSchema fs : schema.getFields()) {
+            boolean fieldIsNull = false;
+            try {
+                switch (fs.type) {
+                case DataType.BOOLEAN: st.getBoolean(i); break;
+                case DataType.BYTEARRAY: st.getBytes(i); break;
+                case DataType.CHARARRAY: st.getString(i); break;
+                case DataType.INTEGER: st.getInt(i); break;
+                case DataType.LONG: st.getLong(i); break;
+                case DataType.FLOAT: st.getFloat(i); break;
+                case DataType.DOUBLE: st.getDouble(i); break;
+                case DataType.TUPLE: st.getTuple(i); break;
+                case DataType.BAG: st.getDataBag(i); break;
+                default: throw new RuntimeException("Unsupported FieldSchema in SchemaTuple: " + fs);
+                }
+            } catch (FieldIsNullException e) {
+                fieldIsNull = true;
+            }
+            assertTrue(fieldIsNull);
+            i++;
+        }
+    }
+
+    /**
+     * This tests that InterStorage will work as expected with the Tuples generated by
+     * the given TupleFactory.
+     * @param tf
+     * @throws Exception
+     */
+    public void testInterStorageSerDe(SchemaTupleFactory tf) throws Exception {
+        int sz = 4096;
+        List<Tuple> written = new ArrayList<Tuple>(sz);
+
+        File temp = File.createTempFile("tmp", "tmp");
+        temp.deleteOnExit();
+        FileOutputStream fos = new FileOutputStream(temp);
+        DataOutputStream dos = new DataOutputStream(fos);
+
+        InterRecordWriter writer = new InterRecordWriter(dos);
+
+        // We add these lines because a part of the InterStorage logic
+        // is the ability to seek to the next Tuple based on a magic set
+        // of bytes. This emulates the random byes that will be present
+        // at the beginning of a split.
+        dos.writeByte(r.nextInt());
+        dos.writeByte(r.nextInt());
+        dos.writeByte(r.nextInt());
+        dos.writeByte(r.nextInt());
+        dos.writeByte(r.nextInt());
+        dos.writeByte(r.nextInt());
+
+        for (int i = 0; i < sz; i++) {
+            SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+            fillWithData(st);
+            writer.write(null, st);
+            written.add(st);
+
+            dos.writeByte(r.nextInt());
+            dos.writeByte(r.nextInt());
+            dos.writeByte(r.nextInt());
+        }
+        writer.close(null);
+
+        Configuration conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+
+        TaskAttemptID taskId = HadoopShims.createTaskAttemptID("jt", 1, true, 1, 1);
+        conf.set("mapred.task.id", taskId.toString());
+
+        InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, temp.length(), null);
+
+        InterRecordReader reader = new InterRecordReader();
+        reader.initialize(is, new TaskAttemptContext(conf, taskId));
+
+        for (int i = 0; i < sz; i++) {
+            assertTrue(reader.nextKeyValue());
+            SchemaTuple<?> st = (SchemaTuple<?>)reader.getCurrentValue();
+            assertEquals(written.get(i), st);
+        }
+        reader.close();
+
+    }
+
+    public void testSerDe(SchemaTupleFactory tf) throws IOException {
+        int sz = 4096;
+        List<Tuple> written = new ArrayList<Tuple>(sz);
+
+        File temp = File.createTempFile("tmp", "tmp");
+        temp.deleteOnExit();
+        FileOutputStream fos = new FileOutputStream(temp);
+        DataOutput dos = new DataOutputStream(fos);
+
+        for (int i = 0; i < sz; i++) {
+            SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+            fillWithData(st);
+            bis.writeDatum(dos, st);
+            written.add(st);
+        }
+        fos.close();
+
+        assertEquals(sz, written.size());
+
+        FileInputStream fis = new FileInputStream(temp);
+        DataInput din = new DataInputStream(fis);
+        for (int i = 0; i < sz; i++) {
+            SchemaTuple<?> st = (SchemaTuple<?>)bis.readDatum(din);
+            assertEquals(written.get(i), st);
+        }
+        fis.close();
+    }
+
+    @Test
+    public void testFRJoinWithSchemaTuple() throws Exception {
+        testJoinType("replicated", false);
+    }
+
+    @Test
+    public void testMergeJoinWithSchemaTuple() throws Exception {
+        testJoinType("merge", true);
+    }
+
+    public void testJoinType(String joinType, boolean preSort) throws Exception {
+        Properties props = PropertiesUtil.loadDefaultProperties();
+        props.setProperty("pig.schematuple", "true");
+        PigServer pigServer = new PigServer(ExecType.LOCAL, props);
+
+        Data data = resetData(pigServer);
+
+        data.set("foo1",
+            tuple(0),
+            tuple(1),
+            tuple(2),
+            tuple(3),
+            tuple(4),
+            tuple(5),
+            tuple(6),
+            tuple(7),
+            tuple(8),
+            tuple(9)
+            );
+
+        data.set("foo2",
+            tuple(0),
+            tuple(1),
+            tuple(2),
+            tuple(3),
+            tuple(4),
+            tuple(5),
+            tuple(6),
+            tuple(7),
+            tuple(8),
+            tuple(9)
+            );
+
+        pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);");
+        pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);");
+        if (preSort) {
+            pigServer.registerQuery("A = ORDER A BY x ASC;");
+            pigServer.registerQuery("B = ORDER B BY x ASC;");
+        }
+        pigServer.registerQuery("C = JOIN A by x, B by x using '"+joinType+"';");
+        pigServer.registerQuery("D = ORDER C BY $0 ASC;");
+
+        Iterator<Tuple> out = pigServer.openIterator("D");
+        for (int i = 0; i < 10; i++) {
+            if (!out.hasNext()) {
+                throw new Exception("Output should have had more elements! Failed on element: " + i);
+            }
+            assertEquals(tuple(i, i), out.next());
+        }
+        assertFalse(out.hasNext());
+
+        pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();");
+
+        List<Tuple> tuples = data.get("bar");
+
+        if (tuples.size() != 10) {
+            throw new Exception("Output does not have enough elements! List: " + tuples);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(tuple(i, i), tuples.get(i));
+        }
+
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java (added)
+++ pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Method;
+
+import org.apache.pig.data.utils.MethodHelper.NotImplemented;
+import org.junit.Test;
+
+public class TestMethodHelper {
+    @Test
+    public void testImplementation() throws NoSuchMethodException {
+        Method t1 = ITest.class.getMethod("t1");
+        Method t2 = ITest.class.getMethod("t2");
+
+        shouldBe(t1, Test1.class, true);
+        shouldBe(t2, Test1.class, true);
+
+        shouldBe(t1, Test2.class, false);
+        shouldBe(t2, Test2.class, true);
+
+        shouldBe(t1, Test3.class, true);
+        shouldBe(t2, Test3.class, true);
+
+        shouldBe(t1, Test4.class, false);
+        shouldBe(t2, Test4.class, true);
+
+        shouldBe(t1, Test5.class, false);
+        shouldBe(t2, Test5.class, false);
+
+        shouldBe(t1, Test6.class, true);
+        shouldBe(t2, Test6.class, false);
+
+        shouldBe(t1, Test7.class, false);
+        shouldBe(t2, Test7.class, false);
+    }
+
+    private void shouldBe(Method m, Class c, boolean b) {
+        assertTrue(MethodHelper.isNotImplementedAnnotationPresent(m, c) == b);
+    }
+
+    static interface ITest {
+        public void t1();
+
+        public void t2();
+    }
+
+    static class Test1 implements ITest {
+        @NotImplemented
+        public void t1() {}
+
+        @NotImplemented
+        public void t2() {}
+    }
+
+    static class Test2 implements ITest {
+        public void t1() {}
+
+        @NotImplemented
+        public void t2() {}
+    }
+
+    static class Test3 extends Test1 {}
+
+    static class Test4 extends Test2 {}
+
+    static class Test5 extends Test3 {
+        public void t1() {}
+
+        public void t2() {}
+    }
+
+    static class Test6 extends Test5 {
+        @NotImplemented
+        public void t1() {}
+    }
+
+    static class Test7 extends Test6 {
+        public void t1() {}
+    }
+
+}

Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Tue Jul  3 20:36:09 2012
@@ -185,8 +185,7 @@ public class TestDataBag extends junit.f
         // Write tuples into both
         for (int j = 0; j < 3; j++) {
             for (int i = 0; i < 10; i++) {
-                Tuple t = TupleFactory.getInstance().newTupleForSchema(DataType.INTEGER);
-                t.set(0, i);
+                Tuple t = TupleFactory.getInstance().newTuple(new Integer(i));
                 b.add(t);
                 rightAnswer.add(t);
             }

Modified: pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Jul  3 20:36:09 2012
@@ -253,7 +253,7 @@ public class TestLogicalPlanBuilder {
         Operator lo = listOp.get(0);
         
         if (lo instanceof LOCogroup) {
-            Assert.assertEquals( 1, ((LOCogroup) lo).getRequestedParallelisam() );//Local mode, paraallel = 1
+            Assert.assertEquals( 1, ((LOCogroup) lo).getRequestedParallelism() );//Local mode, paraallel = 1
         } else {
             Assert.fail("Error: Unexpected Parse Tree output");
         }  

Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Tue Jul  3 20:36:09 2012
@@ -16,24 +16,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.pig.test;
 
 import java.io.IOException;