You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/03 09:59:14 UTC

svn commit: r1308737 [3/3] - in /pig/trunk: ./ bin/ ivy/ src/main/ src/main/jruby/ src/org/apache/pig/ src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/jruby/ src/org/apache/pig/tools/counters/ test/e2e/pig/ test...

Added: pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java?rev=1308737&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java Tue Apr  3 07:59:12 2012
@@ -0,0 +1,955 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.Arrays;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.data.DataType;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+import org.jruby.Ruby;
+import org.jruby.RubyHash;
+import org.jruby.RubyArray;
+import org.jruby.RubyClass;
+import org.jruby.RubyFixnum;
+import org.jruby.RubyModule;
+import org.jruby.RubyObject;
+import org.jruby.RubyRange;
+import org.jruby.RubyString;
+import org.jruby.RubySymbol;
+import org.jruby.anno.JRubyClass;
+import org.jruby.anno.JRubyMethod;
+import org.jruby.runtime.ObjectAllocator;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.Block;
+import org.jruby.runtime.builtin.IRubyObject;
+
+//TODO implement all of the merge functions
+
+/**
+ * This class encapsulated a native Schema object, and provides a more convenient
+ * interface for manipulating Schemas. It hides the Schema/FieldSchema distinction
+ * from the user, and tries to present a cleaner, more Ruby-esque API to the user.
+ * For general information on JRuby's API definition annotations,
+ * see {@link RubyDataBag}.
+ */
+@JRubyClass(name = "Schema")
+public class RubySchema extends RubyObject {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * This is a pattern used in the conversion from ruby arguments to a valid Schema. It detects
+     * cases where there is a bag, map, or tuple without being followed by {}, [], or () respectively.
+     * It is used for convenience.
+     */
+    private static final Pattern bmtPattern = Pattern.compile("(?:\\S+:)?(bag|map|tuple)\\s*(?:,|$)", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * This is the encapsulated Schema object.
+     */
+    private Schema internalSchema;
+
+    private static final ObjectAllocator ALLOCATOR = new ObjectAllocator() {
+        public IRubyObject allocate(Ruby runtime, RubyClass klass) {
+            return new RubySchema(runtime, klass);
+        }
+    };
+
+    /**
+     * This method registers the class with the given runtime.
+     *
+     * @param runtime an instance of the Ruby runtime
+     * @return        a RubyClass object with metadata about the registered class
+     */
+    public static RubyClass define(Ruby runtime) {
+        RubyClass result = runtime.defineClass("Schema",runtime.getObject(), ALLOCATOR);
+
+        result.kindOf = new RubyModule.KindOf() {
+            public boolean isKindOf(IRubyObject obj, RubyModule type) {
+                return obj instanceof RubySchema;
+            }
+        };
+
+        result.includeModule(runtime.getEnumerable());
+
+        result.defineAnnotatedMethods(RubySchema.class);
+
+        return result;
+    }
+
+    protected RubySchema(final Ruby ruby, RubyClass rc) {
+        super(ruby,rc);
+        internalSchema = new Schema();
+    }
+
+    /**
+     * This constructor sets the encapsulated Schema to be equal to
+     * the given Schema. If copy is true, it is set equal to a copy.
+     * If it is false, it is set directly equal.
+     *
+     * @param ruby an instance of the ruby runtime
+     * @param rc   an instance of the class object with meatadata
+     * @param s    a Schema to encapsulate
+     * @param copy a boolean value. If true, s will be copied and the copy
+     *             will be encapsulated. If false, it will be encapsulated
+     *             directly.
+     */
+    protected RubySchema(final Ruby ruby, RubyClass rc, Schema s, boolean copy) {
+        super(ruby,rc);
+        if (copy) {
+            internalSchema = new Schema(s);
+        } else {
+            internalSchema = s;
+        }
+    }
+
+    /**
+     * This constructor sets the encapsulated Schema to be equal to the
+     * given Schema.
+     *
+     * @param ruby an instance of the ruby runtime
+     * @param rc   an instance of the class object with meatadata
+     * @param s    a Schema to encapsulate
+     */
+    protected RubySchema(final Ruby ruby, RubyClass rc, Schema s) {
+        this(ruby, rc, s, true);
+    }
+
+    /**
+     * This constructor is provided for convenience and sets the
+     * internal Schema equal to the result of a call to
+     * {@link Utils#getSchemaFromString}.
+     *
+     * @param ruby an instance of the ruby runtime
+     * @param rc   an instance of the class object with meatadata
+     * @param s    a String which will be passed to
+     *             {@link Utils#getSchemaFromString}
+     */
+    protected RubySchema(final Ruby ruby, RubyClass rc, String s) {
+        super(ruby, rc);
+        try {
+            internalSchema = Utils.getSchemaFromString(s);
+        } catch (ParserException e) {
+            throw new RuntimeException("Error converting String to Schema: " + s, e);
+        }
+    }
+
+    /**
+     * The ruby initializer accepts any number of arguments. With no arguments,
+     * it will return an empty Schema object. It can accept any number of arguments.
+     * To understand the valid arguments, see the documentation for {@link #rubyArgToSchema}.
+     *
+     * @param args a varargs which can take any number of valid arguments to
+     *             {@link #rubyArgToSchema}
+     * @return     the initialized RubySchema
+     */
+    @JRubyMethod(rest = true)
+    public RubySchema initialize(IRubyObject[] args) {
+        internalSchema = new Schema();
+        for (IRubyObject arg : args) {
+            Schema rs = rubyArgToSchema(arg);
+            for (Schema.FieldSchema i : rs.getFields())
+                internalSchema.add(i);
+        }
+        RubySchema.fixSchemaNames(internalSchema);
+        return this;
+    }
+
+    /**
+     * This is a static helper method to create a null aliased bytearray Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"by", "bytearray"})
+    public static RubySchema nullBytearray(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.BYTEARRAY);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased Boolean Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"bool", "boolean"})
+    public static RubySchema nullBoolean(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.BOOLEAN);
+    }
+
+
+    /**
+     * This is a static helper method to create a null aliased chararray Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"c", "chararray"})
+    public static RubySchema nullChararray(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.CHARARRAY);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased long Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+   @JRubyMethod(meta = true, name = {"l", "long"})
+    public static RubySchema nullLong(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.LONG);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased int Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"i", "int"})
+    public static RubySchema nullInt(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.INTEGER);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased double Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"d", "double"})
+    public static RubySchema nullDouble(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.DOUBLE);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased float Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"f", "float"})
+    public static RubySchema nullFloate(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.FLOAT);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased tuple Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"t", "tuple"})
+    public static RubySchema nullTuple(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.TUPLE);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased bag Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"b", "bag"})
+    public static RubySchema nullBag(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.BAG);
+    }
+
+    /**
+     * This is a static helper method to create a null aliased map Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"m", "map"})
+    public static RubySchema nullMap(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.MAP);
+    }
+
+    /**
+     * This is a helper method to generate a RubySchema of the given type without an alias.
+     *
+     * @param context the context the method is being executed in
+     * @param type    the DataType.PIGTYPE value to make the Schema from
+     * @return        a RubySchema object encapsulated a Schema of the specified type
+     */
+    private static RubySchema makeNullAliasRubySchema(ThreadContext context, byte type) {
+       Ruby runtime = context.getRuntime();
+       return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(new Schema.FieldSchema(null, type)));
+    }
+
+    /**
+     * This is a helper function which converts objects into Schema objects. The valid
+     * options are as follows:
+     * <p>
+     * A RubyString, which will have {@link Utils#getSchemaFromString} called on it, and
+     * it will be added.
+     * <p>
+     * A RubySchema, which will be added directly. IMPORTANT NOTE: since this API abstracts
+     * away from the distinction between Schema/FieldSchema, its important to understand
+     * how a Schema is added to another. In this case, the FieldSchema is pulled directly
+     * out of the given Schema. Thus, where in Pig a Schema.FieldSchema might be passed around,
+     * internally to this class, generally a Schema will be passed around encapsulating it.
+     * <p>
+     * A list will create the Schema for a Tuple whose elements will be the elements of the
+     * list. Each element will be subjected to the same rules applied here.
+     * <p>
+     * A hash in the form of:<br>
+     * <code>{"name:tuple"=>["x:int","y:int,z:int"], "name2:bag"=>["a:chararray"]}</code><br>
+     * The keys must be a tuple, bag, or map, and the value must be an array.
+     *
+     * @param arg an object (generally an IRubyObject or String) to convert. See above for
+                  the rules on valid arguments
+     * @return    the Schema constructed for the given argument
+     */
+    public static Schema rubyArgToSchema(Object arg) {
+        try {
+            /**
+             * Given a String or a RubyString, calls {@link Utils#getSchemaFromString}.
+             * Additionally, as a convenience to the user, this method uses a regex to
+             * detect any case where a schema declaration of "bag", "tuple", or "map"
+             * does not have the trailing "{}", "()", or "[]" that
+             * {@link Utils#getSchemaFromString} requires.
+             */
+            if (arg instanceof String || arg instanceof RubyString) {
+                String s = arg.toString();
+                Matcher m = bmtPattern.matcher(s);
+                while (m.find()) {
+                    String type = m.group(1);
+                    String inter = s.substring(0, m.start(1));
+
+                    if (type.equalsIgnoreCase("bag")) {
+                         inter += "{}";
+                    } else if (type.equalsIgnoreCase("map")) {
+                         inter += "[]";
+                    } else if (type.equalsIgnoreCase("tuple")) {
+                         inter += "()";
+                    } else {
+                        throw new RuntimeException("Arriving here should be impossible");
+                    }
+
+                    s = inter + s.substring(m.end(1));
+                    m = bmtPattern.matcher(s);
+                }
+                return Utils.getSchemaFromString(s);
+            // In the case of a RubySchema, can just return the encapsulated Schema
+            } else if (arg instanceof RubySchema) {
+                return ((RubySchema)arg).getInternalSchema();
+            // In the case of a RubyArray, the elements of the array are passed to this
+            // method, and they will be treated as elements of a Tuple Schema.
+            } else if (arg instanceof RubyArray) {
+                RubyArray ary = (RubyArray)arg;
+                Schema s = new Schema();
+                for (Object o : ary) {
+                    Schema ts = rubyArgToSchema(o);
+                    for (Schema.FieldSchema fs : ts.getFields()) {
+                      s.add(fs);
+                    }
+                }
+                return new Schema(new Schema.FieldSchema("tuple_0", s, DataType.TUPLE));
+            /**
+             * In the case of a RubyHash, the key serves defines a Schema that will encapsulate
+             * other elements. This mainly is for the convenience of being able to name
+             * bags, maps, and tuples while easily being able to have interchangeable elements.
+             * The key will be given to this method, but must return a singular map, tuple, or
+             * bag, or an error will be thrown. The value to that key must be an array, and
+             * each element will be passed to this method and then added to the Schema for
+             * the key.
+             */
+            } else if (arg instanceof RubyHash) {
+                RubyHash hash = (RubyHash)arg;
+                Schema hashSchema = new Schema();
+                for (Object o : hash.keySet()) {
+                    Schema s = rubyArgToSchema(o);
+                    if (s.size() != 1) {
+                        throw new RuntimeException("Hash key must be singular");
+                    }
+                    Schema.FieldSchema fs = s.getField(0);
+                    Object v = hash.get(o);
+                    if (v instanceof RubyArray) {
+                        byte type = fs.type;
+                        if (type == DataType.BAG) {
+                            fs.schema = rubyArgToSchema(v);
+                        } else if (type == DataType.TUPLE || type == DataType.MAP) {
+                            fs.schema = rubyArgToSchema(v).getField(0).schema;
+                        } else {
+                            throw new RuntimeException("Hash key must be tuple map or bag");
+                        }
+                    } else {
+                        throw new RuntimeException("Hash value must be an Array");
+                    }
+                    hashSchema.add(fs);
+                }
+                return hashSchema;
+            } else {
+                throw new RuntimeException("Bad argument given to rubyToSchema: " + arg + (arg != null ? " class type " + arg.getClass().toString() : ""));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error converting ruby to Schema: " + arg, e);
+        }
+    }
+
+    /**
+     * This is a ruby method which takes a name and an array of arguments and constructs a Tuple schema
+     * from them.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg1    the name for the RubySchema
+     * @param arg2    a list of arguments to instantiate the new RubySchema
+     * @return        the new Tuple RubySchema
+     */
+    @JRubyMethod(meta = true, name = {"t", "tuple"})
+    public static RubySchema tuple(ThreadContext context, IRubyObject self, IRubyObject arg1, IRubyObject arg2) {
+         RubySchema rs = tuple(context, self, arg2);
+         rs.setNameIf(arg1);
+         return rs;
+    }
+
+    /**
+     * This is a ruby method which takes an array of arguments and constructs a Tuple schema from them. The name
+     * will be set automatically.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg     a list of arguments to instantiate the new RubySchema
+     * @return        the new RubySchema
+     */
+    @JRubyMethod(meta = true, name = {"t", "tuple"})
+    public static RubySchema tuple(ThreadContext context, IRubyObject self, IRubyObject arg) {
+        if (arg instanceof RubyArray) {
+            Schema s = rubyArgToSchema(arg);
+            Ruby runtime = context.getRuntime();
+            return new RubySchema(runtime, runtime.getClass("Schema"), s);
+        } else {
+            throw new RuntimeException("Bad argument given to Schema.tuple");
+        }
+    }
+
+    /**
+     * This is a ruby method which takes a name and an array of arguments and constructs a Map schema
+     * from them.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg1    the name for the RubySchema
+     * @param arg2    a list of arguments to instantiate the new RubySchema
+     * @return        the new RubySchema
+     */
+    @JRubyMethod(meta = true, name = {"m", "map"})
+    public static RubySchema map(ThreadContext context, IRubyObject self, IRubyObject arg1, IRubyObject arg2) {
+         RubySchema rs = map(context, self, arg2);
+         rs.setNameIf(arg1);
+         return rs;
+    }
+
+    /**
+     * This is a ruby method which takes an array of arguments and constructs a Map schema from them. The name
+     * will be set automatically.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg     a list of arguments to instantiate the new RubySchema
+     * @return        the new RubySchema
+     */
+    @JRubyMethod(meta = true, name = {"m", "map"})
+    public static RubySchema map(ThreadContext context, IRubyObject self, IRubyObject arg) {
+        Schema s = tuple(context, self, arg).getInternalSchema();
+        Ruby runtime = context.getRuntime();
+        try {
+            return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(new Schema.FieldSchema("map_0", s.getField(0).schema, DataType.MAP)));
+        } catch (FrontendException e) {
+            throw new RuntimeException("Error making map", e);
+        }
+    }
+
+    /**
+     * This is a ruby method which takes a name and an array of arguments and constructs a Bag schema
+     * from them.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg1    the name for the RubySchema
+     * @param arg2    a list of arguments to instantiate the new RubySchema
+     * @return        the new RubySchema
+     */
+    @JRubyMethod(meta = true, name={"b", "bag"})
+    public static RubySchema bag(ThreadContext context, IRubyObject self, IRubyObject arg1, IRubyObject arg2) {
+         RubySchema rs = bag(context, self, arg2);
+         rs.setNameIf(arg1);
+         return rs;
+    }
+
+    /**
+     * This is a ruby method which takes an array of arguments and constructs a Bag schema from them. The name
+     * will be set automatically.
+     *
+     * @param context the context the method is being executed in
+     * @param self    the RubyClass for the Class object this was invoked on
+     * @param arg     a list of arguments to instantiate the new RubySchema
+     * @return        the new RubySchema
+     */
+    @JRubyMethod(meta = true, name = {"b", "bag"})
+    public static RubySchema bag(ThreadContext context, IRubyObject self, IRubyObject arg) {
+        Schema s = tuple(context, self, arg).getInternalSchema();
+        Ruby runtime = context.getRuntime();
+        try {
+            return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(new Schema.FieldSchema("bag_0", s, DataType.BAG)));
+        } catch (FrontendException e) {
+            throw new RuntimeException("Error making map", e);
+        }
+    }
+
+    /**
+     * This method will fix any name conflicts in a schema. It's important to note that
+     * this will change the Schema object itself. It will deal with any collisions in things
+     * named tuple_#, bag_#, map_#, or val_#, as these are generally names generated by
+     * Util.getSchemaFromString. In the case of another name conflict, it will not be
+     * changed, as that name conflict was created by the user.
+     *
+     * @param s a Schema object to fix in place
+     */
+    private static void fixSchemaNames(Schema s) {
+        if (s == null)
+             return;
+        // This regex detects names that could possibly collide that we should change
+        Pattern p = Pattern.compile("(bag_|tuple_|map_|val_)(\\d+)", Pattern.CASE_INSENSITIVE);
+        Set<String> names = new HashSet<String>(s.size(), 1.0f);
+        for (Schema.FieldSchema fs : s.getFields()) {
+            if (fs.alias == null)
+                 continue;
+            Matcher m = p.matcher(fs.alias);
+            if (m.matches() && names.contains(fs.alias)) {
+                String prefix = m.group(1);
+                int suffix = Integer.parseInt(m.group(2));
+                while (names.contains(prefix + suffix))
+                    suffix++;
+                fs.alias = prefix + suffix;
+            }
+            names.add(fs.alias);
+            if (fs.schema != null) {
+                if (fs.type == DataType.BAG) {
+                    try {
+                        fixSchemaNames(fs.schema.getField(0).schema);
+                    } catch (FrontendException e) {
+                        throw new RuntimeException("Error recursively fixing schema: " + s, e);
+                    }
+                } else {
+                    fixSchemaNames(fs.schema);
+                }
+            }
+        }
+    }
+
+    /**
+     * This is just a convenience method which sets the name of the internalSchema to the argument that was given.
+     *
+     * @param arg a RubyString to set the name of the encapsulated Schema object
+     */
+    private void setNameIf(IRubyObject arg) {
+        if (arg instanceof RubyString) {
+            setName(arg.toString());
+        } else {
+            throw new RuntimeException("Bad name given");
+        }
+    }
+
+    /**
+     * This method sets the name of a RubySchema to the name given. It's important to note that
+     * if the RubySchema represents anything other than a tuple, databag, or map then an error
+     * will be thrown.
+     *
+     * @param name a String to set the name of the encapsulated Schema object
+     */
+    private void setName(String name) {
+        Schema.FieldSchema fs;
+
+        try {
+            fs = internalSchema.getField(0);
+        } catch (FrontendException e) {
+            throw new RuntimeException("Error getting field from schema: " + internalSchema, e);
+        }
+
+        byte type = fs.type;
+
+        if (type == DataType.TUPLE || type == DataType.BAG || type == DataType.MAP) {
+            fs.alias = name;
+        } else {
+            throw new RuntimeException("setName cannot be set on Schema: " + internalSchema);
+        }
+    }
+
+    /**
+     * The toString method just leverages Schema's printing.
+     *
+     * @param context the context the method is being executed in
+     * @return        a String representation of the encapsulated Schema object
+     */
+    @JRubyMethod(name = {"to_s", "inspect"})
+    public RubyString toString(ThreadContext context) {
+        return RubyString.newString(context.getRuntime(), internalSchema.toString());
+    }
+
+    /**
+     * This is the ruby method which allows people to access elements of the RubySchema object.
+     * It can be given either a single numeric index, or a Range object to specify a range of indices.
+     * It's important to note that the Schema object returned from this references the Schema stored
+     * internally, so if the user wants to make changes without affecting this object, it must be cloned.
+     *
+     * @param context the context the method is being executed in
+     * @param arg     a Fixnum index, Range object to specify a range of values to return, or
+     *                a String to look up by alias name
+     * @return        the RubySchema object encapsulated the found Schema
+     */
+    @JRubyMethod(name = {"[]", "slice"})
+    public RubySchema get(ThreadContext context, IRubyObject arg) {
+        Ruby runtime = context.getRuntime();
+        if (arg instanceof RubyFixnum) {
+            int index = (int)((RubyFixnum)arg).getLongValue();
+            Schema s;
+            try {
+                s = new Schema(internalSchema.getField(index));
+            } catch (FrontendException e) {
+                throw new RuntimeException("Invalid index given to get function: " + index, e);
+            }
+            return new RubySchema(runtime, runtime.getClass("Schema"), s, false); //returns the actual object itself
+        } else if (arg instanceof RubyRange) {
+            int min = (int)((RubyFixnum)((RubyRange)arg).min(context, Block.NULL_BLOCK)).getLongValue();
+            int max = (int)((RubyFixnum)((RubyRange)arg).max(context, Block.NULL_BLOCK)).getLongValue();
+            return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(internalSchema.getFields().subList(min, max + 1)), false);
+        } else if (arg instanceof RubyString) {
+             try {
+                 return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(internalSchema.getField(arg.toString())), false);
+             } catch (FrontendException e) {
+                 throw new RuntimeException("Unable to find field " + arg.toString() + " in schema " + internalSchema, e);
+             }
+        } else {
+            throw new RuntimeException("Invalid argument given to get function: " + arg.toString());
+        }
+    }
+
+    /**
+     * This is a version of [] which allows the range to be specified as such: [1,2].
+     *
+     * @param context the context the method is being executed in
+     * @param arg1    a Fixnum start index
+     * @param arg2    a Fixnum end index
+     * @return        the RubySchema object encapsulated the found Schema
+     */
+    @JRubyMethod(name = {"[]", "slice"})
+    public RubySchema get(ThreadContext context, IRubyObject arg1, IRubyObject arg2) {
+        if (arg1 instanceof RubyFixnum && arg2 instanceof RubyFixnum) {
+            Ruby runtime = context.getRuntime();
+            int min = (int)((RubyFixnum)arg1).getLongValue();
+            int max = (int)((RubyFixnum)arg2).getLongValue() - 1;
+            return new RubySchema(runtime, runtime.getClass("Schema"), new Schema(internalSchema.getFields().subList(min, max + 1)), false);
+        } else {
+            throw new RuntimeException("Bad arguments given to get function: ( " + arg1.toString() + " , " + arg2.toString()+ " )");
+        }
+    }
+
+    /**
+     * This allows the users to set an index or a range of values to
+     * a specified RubySchema. The first argument must be a Fixnum or Range,
+     * and the second argument may optionally be a Fixnum. The given index
+     * (or range of indices) will be replaced by a RubySchema instantiated
+     * based on the remaining arguments.
+     *
+     * @param context the contextthe method is being executed in
+     * @param args    a varargs which has to be at least length two.
+     * @return        the RubySchema that was added
+     */
+    @JRubyMethod(name = {"[]=", "set"}, required = 2, rest = true)
+    public RubySchema set(ThreadContext context, IRubyObject[] args) {
+        IRubyObject arg1 = args[0];
+        IRubyObject arg2 = args[1];
+        IRubyObject[] arg3 = Arrays.copyOfRange(args, 1, args.length);
+        Schema s = internalSchema;
+        Ruby runtime = context.getRuntime();
+        List<Schema.FieldSchema> lfs = s.getFields();
+        int min, max;
+        if (arg1 instanceof RubyFixnum && arg2 instanceof RubyFixnum) {
+            min = (int)((RubyFixnum)arg1).getLongValue();
+            max = (int)((RubyFixnum)arg2).getLongValue();
+            arg3 = Arrays.copyOfRange(args, 2, args.length);
+        } else if (arg1 instanceof RubyFixnum) {
+            min = (int)((RubyFixnum)arg1).getLongValue();
+            max = min + 1;
+        } else if (arg1 instanceof RubyRange) {
+            min = (int)((RubyFixnum)((RubyRange)arg1).min(context, Block.NULL_BLOCK)).getLongValue();
+            max = (int)((RubyFixnum)((RubyRange)arg1).max(context, Block.NULL_BLOCK)).getLongValue() + 1;
+        } else {
+            throw new RuntimeException("Bad arguments given to get function: ( " + arg1.toString() + " , " + arg2.toString()+ " )");
+        }
+        for (int i = min; i < max; i++)
+            lfs.remove(min);
+        if (arg3 == null || arg3.length == 0)
+            throw new RuntimeException("Must have schema argument for []=");
+        RubySchema rs = new RubySchema(runtime, runtime.getClass("Schema")).initialize(arg3);
+        for (Schema.FieldSchema fs : rs.getInternalSchema().getFields())
+            lfs.add(min++, fs);
+        RubySchema.fixSchemaNames(internalSchema);
+        return rs;
+    }
+
+    /**
+     * This method provides addition semantics, without modifying the original Schema.
+     * This method can be given any number of arguments, much as with the constructor.
+     *
+     * @param context the context the method is being executed in
+     * @param args    a varargs which can be any valid set of arguments that
+     *                can initialize a RubySchema
+     * @return        the Rresult of the addition
+     */
+    @JRubyMethod(name = {"add", "+"}, rest = true)
+    public RubySchema add(ThreadContext context, IRubyObject[] args) {
+        RubySchema rsClone = clone(context);
+        rsClone.addInPlace(context, args);
+        return rsClone;
+    }
+
+    /**
+     * This method provides addition semantics, modifying the original Schema in place.
+     * This method can be given any number of arguments, much as with the constructor.
+     *
+     * @param context the context the method is being executed in
+     * @param args    a varargs which can be any valid set of arguments that
+     *                can initialize a RubySchema
+     */
+    @JRubyMethod(name = "add!", rest = true)
+    public void addInPlace(ThreadContext context, IRubyObject[] args) {
+        Ruby runtime = context.getRuntime();
+        List<Schema.FieldSchema> lfs = internalSchema.getFields();
+        RubySchema rs = new RubySchema(runtime, runtime.getClass("Schema")).initialize(args);
+        for (Schema.FieldSchema fs : rs.getInternalSchema().getFields())
+            lfs.add(fs);
+        RubySchema.fixSchemaNames(internalSchema);
+    }
+
+    /**
+     * @param context the context the method is being executed in
+     * @return        a RubySchema copy of the Schema
+     */
+    @JRubyMethod
+    public RubySchema clone(ThreadContext context) {
+        Ruby runtime = context.getRuntime();
+        return new RubySchema(runtime, runtime.getClass("Schema"), internalSchema);
+    }
+
+    /**
+     * Given a field name this string will search the RubySchema for a FieldSchema
+     * with that name and return it encapsulated in a Schema.
+     *
+     * @param context the context the method is being executed in
+     * @param arg     a RubyString serving as an alias to look
+     *                for in the Schema
+     * @return        the found RubySchema
+     */
+    @JRubyMethod
+    public RubySchema find(ThreadContext context, IRubyObject arg) {
+        if (arg instanceof RubyString) {
+            Ruby runtime = context.getRuntime();
+            return new RubySchema(runtime, runtime.getClass("Schema"), RubySchema.find(internalSchema, arg.toString()), false);
+        } else {
+            throw new RuntimeException("Invalid arguement passed to find: " + arg);
+        }
+    }
+
+    /**
+     * This is a helper method which recursively searches for an alias in the Schema
+     * encapsulated by RubySchema. This is necessary because findFieldSchema uses
+     * canonicalName, not name.
+     *
+     * @param s     the Schema to search through
+     * @param alias
+     * @return      the found RubySchema
+     */
+    private static Schema find(Schema s, String alias) {
+        for (Schema.FieldSchema fs : s.getFields())
+            if (alias.equals(fs.alias))
+                 return new Schema(fs);
+        for (Schema.FieldSchema fs : s.getFields())
+            if (fs.schema != null) {
+                 Schema r = RubySchema.find(fs.schema, alias);
+                 if (r != null)
+                     return r;
+            }
+        return new Schema();
+    }
+
+    /**
+     * Given a field name, this will return the index of it in the schema.
+     *
+     * @param context the context the method is being executed in
+     * @param arg     a field name to look for
+     * @return        the index for that field name
+     */
+    @JRubyMethod
+    public RubyFixnum index(ThreadContext context, IRubyObject arg) {
+        if (arg instanceof RubyString) {
+            try {
+                return new RubyFixnum(context.getRuntime(), internalSchema.getPosition(arg.toString()));
+            } catch (FrontendException e) {
+                throw new RuntimeException("Unable to find position for argument: " + arg);
+            }
+        } else {
+            throw new RuntimeException("Invalid arguement passed to index: " + arg);
+        }
+    }
+
+    /**
+     * @param context the context the method is being executed in
+     * @return        the size of the encapsulated Schema
+     */
+    @JRubyMethod(name = {"size", "length"})
+    public RubyFixnum size(ThreadContext context) {
+        return new RubyFixnum(context.getRuntime(), internalSchema.size());
+    }
+
+    /**
+     * This is a helper method to pull out the native Java type from the ruby object.
+     *
+     * @return the encapsulated Schema
+     */
+    public Schema getInternalSchema() {
+        return internalSchema;
+    }
+
+    /**
+     * This method allows access into the Schema nested in the encapsulated Schema. For example,
+     * if the encapsulated Schema is a bag Schema, this allows the user to access the schema of
+     * the interior Tuple.
+     *
+     * @param context the context the method is being executed in
+     * @return        a RubySchema encapsulating the nested Schema
+     */
+    @JRubyMethod(name = {"get", "inner", "in"})
+    public RubySchema get(ThreadContext context) {
+        if (internalSchema.size() != 1)
+            throw new RuntimeException("Can only return nested schema if there is one schema to get");
+        Ruby runtime = context.getRuntime();
+        try {
+            return new RubySchema(runtime, runtime.getClass("Schema"), internalSchema.getField(0).schema, false);
+        } catch (FrontendException e) {
+            throw new RuntimeException("Schema does not have a nested FieldScema", e);
+        }
+    }
+
+    /**
+     * This method allows the user to see the name of the alias of the FieldSchema of the encapsulated
+     * Schema. This method only works if the Schema has one FieldSchema.
+     *
+     * @param context the context the method is being executed in
+     * @Return        the name of the Schema
+     */
+    @JRubyMethod(name = "name")
+    public RubyString getName(ThreadContext context) {
+        try {
+            if (internalSchema.size() != 1)
+                 throw new RuntimeException("Can only get name if there is one schema present");
+
+            return RubyString.newString(context.getRuntime(), internalSchema.getField(0).alias);
+        } catch (FrontendException e) {
+            throw new RuntimeException("Unable to get field from Schema", e);
+        }
+    }
+
+    /**
+     * This method allows the user to set the name of the alias of the FieldSchema of the encapsulated
+     * Schema. This method only works if the Schema has one FieldSchema.
+     *
+     * @param arg a RubyString to set the name to
+     * @return    the new name
+     */
+    @JRubyMethod(name = "name=")
+    public RubyString setName(IRubyObject arg) {
+        if (arg instanceof RubyString) {
+             if (internalSchema.size() != 1)
+                 throw new RuntimeException("Can only set name if there is one schema present");
+             try {
+                 internalSchema.getField(0).alias = arg.toString();
+                 return (RubyString)arg;
+             } catch (FrontendException e) {
+                 throw new RuntimeException("Unable to get field from Schema", e);
+             }
+        } else {
+             throw new RuntimeException("Improper argument passed to 'name=':" + arg);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java?rev=1308737&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java (added)
+++ pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java Tue Apr  3 07:59:12 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.counters;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A helper class to deal with Hadoop counters in Pig.  They are stored within the singleton
+ * PigStatusReporter instance, but are null for some period of time at job startup, even after
+ * Pig has been invoked.  This class buffers counters, trying each time to get a valid Reporter and flushing
+ * stored counters each time it does.
+ */
+public class PigCounterHelper {
+  private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
+  private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
+
+  /**
+   * Mocks the Reporter.incrCounter, but adds buffering.
+   * See org.apache.hadoop.mapred.Reporter's incrCounter.
+   */
+  public void incrCounter(String group, String counterName, long incr) {
+    PigStatusReporter reporter = PigStatusReporter.getInstance();
+    if (reporter != null) { // common case
+      Counter counter = reporter.getCounter(group, counterName);
+      if (counter != null) {
+        counter.increment(incr);
+
+        if (counterStringMap_.size() > 0) {
+          for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
+            reporter.getCounter(entry.getKey().first, entry.getKey().second).increment(entry.getValue());
+          }
+          counterStringMap_.clear();
+        }
+        return;
+      }
+    }
+    // In the case when reporter is not available, or we can't get the Counter,
+    // store in the local map.
+    Pair<String, String> key = new Pair<String, String>(group, counterName);
+    Long currentValue = counterStringMap_.get(key);
+    counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
+  }
+
+  /**
+   * Mocks the Reporter.incrCounter, but adds buffering.
+   * See org.apache.hadoop.mapred.Reporter's incrCounter.
+   */
+  public void incrCounter(Enum<?> key, long incr) {
+    PigStatusReporter reporter = PigStatusReporter.getInstance();
+    if (reporter != null && reporter.getCounter(key) != null) {
+      reporter.getCounter(key).increment(incr);
+      if (counterEnumMap_.size() > 0) {
+        for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
+          reporter.getCounter(entry.getKey()).increment(entry.getValue());
+        }
+        counterEnumMap_.clear();
+      }
+    } else { // buffer the increments
+      Long currentValue = counterEnumMap_.get(key);
+      counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
+    }
+  }
+}

Modified: pig/trunk/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/build.xml (original)
+++ pig/trunk/test/e2e/pig/build.xml Tue Apr  3 07:59:12 2012
@@ -21,6 +21,8 @@
   <property name="pig.jar" value="${pig.dir}/pig.jar"/>
   <property name="jython.jar"
     value="${pig.dir}/build/ivy/lib/Pig/jython-2.5.0.jar"/>
+  <property name="jruby.jar"
+    value="${pig.dir}/build/ivy/lib/Pig/jruby-complete-1.6.7.jar"/>
 
   <!-- Separate property name for udfs' build.xml -->
   <property name="pig.jarfile" value="${pig.jar}"/>
@@ -28,6 +30,7 @@
   <property name="udf.java.dir" value="${udf.dir}/java"/>
   <property name="udf.jar" value="${udf.java.dir}/testudf.jar"/>
   <property name="python.udf.dir" value="${udf.dir}/python"/>
+  <property name="ruby.udf.dir" value="${udf.dir}/ruby" />
   <property name="params.dir" value="${basedir}/paramfiles"/>
   <property name="e2e.lib.dir" value="${basedir}/lib"/>
   <property name="streaming.dir" value="${basedir}/streaming"/>
@@ -127,6 +130,7 @@
     <mkdir dir="${tar.dir}/libexec/PigTest/test"/>
     <mkdir dir="${tar.dir}/libexec/PigTest/generate"/>
     <mkdir dir="${tar.dir}/libexec/python"/>
+    <mkdir dir="${tar.dir}/libexec/ruby"/>
     <mkdir dir="${tar.dir}/lib"/>
     <mkdir dir="${tar.dir}/lib/java"/>
     <mkdir dir="${tar.dir}/paramfiles"/>
@@ -170,6 +174,10 @@
       <fileset dir="${python.udf.dir}"/>
     </copy>
 
+    <copy todir="${tar.dir}/libexec/ruby">
+      <fileset dir="${ruby.udf.dir}"/>
+    </copy>
+
     <copy todir="${tar.dir}/paramfiles">
       <fileset file="${params.dir}/params_3"/>
     </copy>
@@ -242,6 +250,8 @@
       <env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/>
       <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
       <env key="PH_JYTHON_JAR" value="${jython.jar}"/>
+      <env key="PH_JRUBY_JAR" value="${jruby.jar}"/>
+      <env key="JRUBY_OPTS" value="--1.9"/>
       <env key="HARNESS_CONF" value="${harness.conf.file}"/>
       <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
       <env key="HADOOP_PREFIX" value="${HADOOP_PREFIX}"/>
@@ -292,6 +302,8 @@
       <env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/>
       <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
       <env key="PH_JYTHON_JAR" value="${jython.jar}"/>
+      <env key="PH_JRUBY_JAR" value="${jruby.jar}"/>
+      <env key="JRUBY_OPTS" value="--1.9"/>
       <env key="HARNESS_CONF" value="${harness.conf.file}"/>
       <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
 

Modified: pig/trunk/test/e2e/pig/conf/default.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/conf/default.conf?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/conf/default.conf (original)
+++ pig/trunk/test/e2e/pig/conf/default.conf Tue Apr  3 07:59:12 2012
@@ -44,6 +44,7 @@ $cfg = {
     , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
     , 'tmpPath'          => '/tmp/pigtest'
     , 'jythonjar'        => "$ENV{PH_JYTHON_JAR}"
+    , 'jrubyjar'         => "$ENV{PH_JRUBY_JAR}"
 
     #PIG
     , 'testconfigpath'   => "$ENV{HADOOP_CONF_DIR}"

Modified: pig/trunk/test/e2e/pig/conf/local.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/conf/local.conf?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/conf/local.conf (original)
+++ pig/trunk/test/e2e/pig/conf/local.conf Tue Apr  3 07:59:12 2012
@@ -41,6 +41,7 @@ $cfg = {
     , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
     , 'tmpPath'          => '/tmp/pigtest'
 	, 'jythonjar'        => "$ENV{PH_JYTHON_JAR}"
+    , 'jrubyjar'         => "$ENV{PH_JRUBY_JAR}"
 
     #PIG
     , 'testconfigpath'   => "$ENV{PH_CLUSTER}/conf/"

Modified: pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm Tue Apr  3 07:59:12 2012
@@ -345,6 +345,7 @@ sub getPigCmd($$$)
 
     # set the PIG_CLASSPATH environment variable
 	my $pcp .= $testCmd->{'jythonjar'} if (defined($testCmd->{'jythonjar'}));
+    $pcp .= ":" . $testCmd->{'jrubyjar'} if (defined($testCmd->{'jrubyjar'}));
     $pcp .= ":" . $testCmd->{'classpath'} if (defined($testCmd->{'classpath'}));
 
     # Set it in our current environment.  It will get inherited by the IPC::Run
@@ -801,8 +802,6 @@ sub compareSingleOutput
 {
     my ($self, $testResult, $testOutput, $benchmarkOutput, $log) = @_;
 
-print $log "testResult: $testResult testOutput: $testOutput benchmarkOutput: $benchmarkOutput\n";
-
     # cksum the the two files to see if they are the same
     my ($testChksm, $benchmarkChksm);
     IPC::Run::run((['cat', $testOutput], '|', ['cksum']), \$testChksm,

Modified: pig/trunk/test/e2e/pig/drivers/Util.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/drivers/Util.pm?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/drivers/Util.pm (original)
+++ pig/trunk/test/e2e/pig/drivers/Util.pm Tue Apr  3 07:59:12 2012
@@ -250,6 +250,9 @@ sub getPigCmd
 				if (defined $properties->{'jythonjar'}) {
 					$classpath = "$classpath:" . $properties->{'jythonjar'};
 				}
+				if (defined $properties->{'jrubyjar'}) {
+					$classpath = "$classpath:" . $properties->{'jrubyjar'};
+				}
                 if( $properties->{'exectype'} eq "local") {
                    # in local mode, we should not use
                    # any hadoop-site.xml

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Tue Apr  3 07:59:12 2012
@@ -3435,6 +3435,190 @@ store b into ':OUTPATH:';\,
                 ]
             },
             {
+            'name' => 'RubyUDFs',
+            'tests' => [
+                    {
+                    # test integer square
+                    'num' => 1,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.square(age);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test string concat and referencing function without a namespace
+                    'num' => 2,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age, gpa);
+b = foreach a generate myfuncs.concat(name, name);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa:double);
+b = foreach a generate CONCAT(name, name);
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test long and double square, plus two references to the same UDF with different schemas
+                    'num' => 3,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:long, gpa:double);
+b = foreach a generate myfuncs.square(age), myfuncs.square(gpa);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age, gpa * gpa;
+store b into ':OUTPATH:';\,
+			        'floatpostprocess' => 1,
+			        'delimiter' => '	',
+                    },
+                    {
+                    # test method with no schema decorator (ie, returns bytearray)
+                    'num' => 4,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = foreach a generate myfuncs.byteconcat(name, name);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = foreach a generate CONCAT(name, name);
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test method with complex types
+                    'num' => 5,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studentcomplextab10k' using PigStorage() as (m:[], t:(name:chararray, age:int, gpa:double), b:{t:(name:chararray, age:int, gpa:double)});
+b = foreach a generate flatten(myfuncs.complexTypes(m, t, b)) as (mm, mt, mb);
+c = foreach b generate mm#'name', mt.$0, mb.$0;
+store c into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studentcomplextab10k' using PigStorage() as (m:[], t:(name:chararray, age:int, gpa:double), b:{t:(name:chararray, age:int, gpa:double)});
+b = foreach a generate SIZE(m#'name'), t.$2, b.$2;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test null input and output
+                    'num' => 6,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studentnulltab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.square(age);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studentnulltab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test functions that call other functions and include other files
+                    'num' => 7,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.redirect(age);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test that functions with same names resolve correctly across name spaces
+                    'num' => 8,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+register ':SCRIPTHOMEPATH:/ruby/morerubyudfs.rb' using jruby as morefuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.square(age), morefuncs.cube(age), morefuncs.CUBE(age);
+store b into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age, age * age * age, age * age * age;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    # test algebraic functions
+                    'num' => 9,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = group a by name;
+c = foreach b generate group, myfuncs.Count(a);
+store c into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = group a by name;
+c = foreach b generate group, COUNT(a);
+store c into ':OUTPATH:';\,
+                    },
+                    {
+                    # test accumulator functions
+                    'num' => 10,
+                    'java_params' => ['-Dpig.accumulative.batchsize=5'],
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/scriptingudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = group a by name;
+c = foreach b generate group, myfuncs.Sum(a.age), myfuncs.Sum(a.gpa);
+d = foreach c generate $0, $1, (double)((int)$2*100)/100;
+store d into ':OUTPATH:';\,
+				    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = group a by name;
+c = foreach b generate group, SUM(a.age), SUM(a.gpa);
+d = foreach c generate $0, $1, (double)((int)$2*100)/100;
+store d into ':OUTPATH:';\,
+                    },
+                    {
+                    'num' => 11,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/morerubyudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate flatten(myfuncs.reverse(name, age));
+store b into ':OUTPATH:';\,
+                     'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age, name;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    'num' => 12,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/morerubyudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = filter a by myfuncs.ISEVEN(age);
+store b into ':OUTPATH:';\,
+                    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = filter a by age%2==0;
+store b into ':OUTPATH:';\,
+                    },
+                    {
+                    'num' => 13,
+                    'java_params' => ['-Dpig.accumulative.batchsize=5'],
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/ruby/morerubyudfs.rb' using jruby as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach (group a all) generate FLATTEN(myfuncs.AppendIndex(a));
+store b into ':OUTPATH:';\,
+                    'verify_pig_script' => q\
+register :FUNCPATH:/testudf.jar;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach (group a all) generate FLATTEN(org.apache.pig.test.udf.evalfunc.AppendIndex(a));
+store b into ':OUTPATH:';\,
+                    },
+                ]
+            },
+            {
             'name' => 'Native',
             'tests' => [
                     {

Added: pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java?rev=1308737&view=auto
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java (added)
+++ pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java Tue Apr  3 07:59:12 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.udf.evalfunc;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class AppendIndex extends AccumulatorEvalFunc<DataBag> {
+    private final static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private final static BagFactory mBagFactory = BagFactory.getInstance();
+
+    private DataBag interBag;
+
+    long ct = 0;
+
+    @Override
+    public void accumulate(Tuple input) throws IOException {
+        if (interBag == null) {
+            interBag = mBagFactory.newDefaultBag();
+            ct = 0;
+        }
+        for (Tuple t : (DataBag)input.get(0)) {
+            Tuple t2 = mTupleFactory.newTupleNoCopy(t.getAll());
+            t2.append(++ct);
+            interBag.add(t2);
+        }
+    }
+
+    @Override
+    public DataBag getValue() {
+        return interBag;
+    }
+
+    public void cleanup() {
+        interBag = null;
+    }
+
+    @Override
+    public Schema outputSchema(Schema inputSchema) {
+        try {
+            inputSchema.getField(0).schema.getField(0).schema.add(new Schema.FieldSchema("index", DataType.LONG));
+            return inputSchema;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+}

Added: pig/trunk/test/e2e/pig/udfs/ruby/morerubyudfs.rb
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/ruby/morerubyudfs.rb?rev=1308737&view=auto
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/ruby/morerubyudfs.rb (added)
+++ pig/trunk/test/e2e/pig/udfs/ruby/morerubyudfs.rb Tue Apr  3 07:59:12 2012
@@ -0,0 +1,66 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+############################################################################
+require 'pigudf'
+
+class Myudfs < PigUdf
+    def square num
+        return num**2 if num
+    end
+
+    def cube num
+        return num**3 if num
+    end
+
+    output_schema_function :rev_schema
+
+    def reverse e1, e2
+      [e2,e1]
+    end
+
+    def rev_schema s
+      Schema.t([s[1],s[0]])
+    end
+end
+
+CUBE = PigUdf.evalfunc("x:int") do |v|
+    v**3
+end
+
+ISEVEN = PigUdf.filterfunc do |v|
+    v % 2 == 0
+end
+
+class AppendIndex < AccumulatorPigUdf
+    outputSchema do |inS|
+        inS.in.in.add! Schema.long
+        inS
+    end
+
+    def initialize
+      @ct = 0
+      @inter = DataBag.new
+    end
+
+    def exec b
+        b.each { |t| @inter.add(t << (@ct+=1)) }
+    end
+
+    def get
+        @inter
+    end
+end

Added: pig/trunk/test/e2e/pig/udfs/ruby/scriptingudfs.rb
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/ruby/scriptingudfs.rb?rev=1308737&view=auto
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/ruby/scriptingudfs.rb (added)
+++ pig/trunk/test/e2e/pig/udfs/ruby/scriptingudfs.rb Tue Apr  3 07:59:12 2012
@@ -0,0 +1,104 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+############################################################################
+require 'pigudf'
+
+class Myudfs < PigUdf
+    outputSchemaFunction :squareSchema
+
+    def square num
+        return nil if num.nil?
+        num**2
+    end
+
+    def squareSchema input
+        input
+    end
+
+
+    outputSchemaFunction :squareSchema
+
+    def redirect num
+        return square(num)
+    end
+
+    outputSchema "word:chararray"
+
+    def concat *input
+        input.inject(:+)
+    end
+
+    def byteconcat *input
+        concat *input
+    end
+
+    outputSchema "t:(m:[], t:(name:chararray, age:int, gpa:double), b:{t:(name:chararray, age:int, gpa:double)})"
+
+    def complexTypes(m, t, b)
+        outm = {}
+        outm["name"] = m["name"].to_s.length if m
+
+        outt = Array.new(3)
+        outt = t.reverse if t
+
+        outb = DataBag.new
+        if b
+            b.each do |x|
+                tmpout = Array.new(3)
+                tmpout = x.reverse
+                outb.add(tmpout)
+            end
+        end
+        out = Array.new(3)
+        out[0] = outm if !outm.empty?
+        out[1] = outt if !outt.all? {|x| x.nil?}
+        out[2] = outb if outb.size() > 0
+        out
+    end
+
+end
+
+class Count < AlgebraicPigUdf
+    output_schema Schema.long
+
+    def initial t
+        t.nil? ? 0 : 1
+    end
+
+    def intermed t
+        return 0 if t.nil?
+        t.flatten.inject(:+)
+    end
+
+    def final t
+        intermed(t)
+    end
+
+end
+
+class Sum < AccumulatorPigUdf
+    output_schema { |i| i.in.in[0] }
+
+    def exec items
+        @sum ||= 0
+        @sum += items.flatten.inject(:+)
+    end
+
+    def get
+        @sum
+    end
+end

Added: pig/trunk/test/e2e/pig/udfs/ruby/udf2.rb
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/ruby/udf2.rb?rev=1308737&view=auto
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/ruby/udf2.rb (added)
+++ pig/trunk/test/e2e/pig/udfs/ruby/udf2.rb Tue Apr  3 07:59:12 2012
@@ -0,0 +1,30 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+############################################################################
+require 'pigudf'
+
+#require 'find'
+#a=[]
+#Find.find('./') {|x| a << x if (x =~ /.*\.rb/i)}
+require './libexec/ruby/morerubyudfs.rb'
+
+class TestUDFs < PigUdf
+    def squaresquare(num)
+        @udfs ||= Myudfs.new
+        @udfs.square(num)**2 if num
+    end
+end

Modified: pig/trunk/test/org/apache/pig/test/TestUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUDF.java?rev=1308737&r1=1308736&r2=1308737&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUDF.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUDF.java Tue Apr  3 07:59:12 2012
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import junit.framework.TestCase;
 
@@ -192,6 +193,41 @@ public class TestUDF extends TestCase {
         }
     }
 
+    @Test
+    public void testHelperEvalFunc() throws Exception {
+        String pref="org.apache.pig.test.utils.HelperEvalFuncUtils$";
+        String[][] UDF = {
+            {pref + "BasicSUM", pref + "AccSUM", pref + "AlgSUM", "SUM"},
+            {pref + "BasicCOUNT", pref + "AccCOUNT", pref + "AlgCOUNT", "COUNT"},
+            {"BasLCWC", "AccLCWC", "AlgLCWC", "5*COUNT"}
+        };
+        String input = "udf_test_helper_eval_func.txt";
+        Util.createLocalInputFile(input, new String[]{"1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15"});
+        for (String[] udfs : UDF) {
+            for (int i = 0; i < udfs.length - 1; i++) {
+                String query = "DEFINE BasLCWC " + pref + "BasicLongCountWithConstructor('5');";
+                query += "DEFINE AccLCWC " + pref +" AccLongCountWithConstructor('5');";
+                query += "DEFINE AlgLCWC " + pref + "AlgLongCountWithConstructor('5');";
+                query += "A = load '" + input + "' as (x:int);";
+                query += "B = foreach (group A all) generate ";
+                for (String s : Arrays.copyOfRange(udfs, i, udfs.length - 1)) {
+                    query += s + "(A),";
+                }
+                query += udfs[udfs.length - 1] + "(A);";
+                PigServer pigServer = new PigServer(ExecType.LOCAL);
+                pigServer.registerQuery(query);
+                Iterator<Tuple> it = pigServer.openIterator("B");
+                while (it.hasNext()) {
+                    Tuple t = it.next();
+                    Long val = (Long)t.get(0);
+                    for (int j = 1; j < i; j++) {
+                        assertEquals(val, t.get(j));
+                    }
+                }
+            }
+        }
+    }
+
 	@Override
 	@After
 	public void tearDown() throws Exception {

Added: pig/trunk/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java?rev=1308737&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java (added)
+++ pig/trunk/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java Tue Apr  3 07:59:12 2012
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.builtin.COUNT;
+import org.apache.pig.builtin.LongSum;
+import org.apache.pig.data.Tuple;
+
+public class HelperEvalFuncUtils {
+    public static class AlgCOUNT extends AlgFunc<Long> {
+        public AlgCOUNT() {
+            super(new COUNT());
+        }
+    }
+
+    public static class AccCOUNT extends AccFunc<Long> {
+        public AccCOUNT() {
+            super(new COUNT());
+        }
+    }
+
+    public static class BasicCOUNT extends BasicFunc<Long> {
+        public BasicCOUNT() {
+            super(new COUNT());
+        }
+    }
+
+    public static class AlgSUM extends AlgFunc<Long> {
+        public AlgSUM() {
+            super(new LongSum());
+        }
+    }
+
+    public static class AccSUM extends AccFunc<Long> {
+        public AccSUM() {
+            super(new LongSum());
+        }
+    }
+
+    public static class BasicSUM extends BasicFunc<Long> {
+        public BasicSUM() {
+            super(new LongSum());
+        }
+    }
+
+    public static class AlgFunc<T> extends AlgebraicEvalFunc<T> {
+        Algebraic f;
+
+        public AlgFunc(Algebraic f) {
+            this.f=f;
+        }
+
+        @Override
+        public String getInitial() {
+            if (f==null) {
+                return LongSum.Initial.class.getName();
+            }
+            return f.getInitial();
+        }
+
+        @Override
+        public String getIntermed() {
+            if (f==null) {
+                return LongSum.Intermediate.class.getName();
+            }
+            return f.getIntermed();
+        }
+
+        @Override
+        public String getFinal() {
+            if (f==null) {
+                return LongSum.Final.class.getName();
+            }
+            return f.getFinal();
+        }
+    }
+
+    public static class AccFunc<T> extends AccumulatorEvalFunc<T> {
+        Accumulator<T> f;
+
+        public AccFunc(Accumulator<T> f) {
+            this.f=f;
+        }
+
+        @Override
+        public T getValue() {
+            return f.getValue();
+        }
+
+        @Override
+        public void accumulate(Tuple input) throws IOException {
+            f.accumulate(input);
+        }
+
+        @Override
+        public void cleanup() {
+            f.cleanup();
+        }
+    }
+
+    public static class BasicFunc<T> extends EvalFunc<T> {
+        EvalFunc<T> f;
+
+        public BasicFunc(EvalFunc<T> f) {
+            this.f=f;
+        }
+
+        @Override
+        public T exec(Tuple input) throws IOException {
+            return f.exec(input);
+        }
+    }
+
+    public static class AccLongCountWithConstructor extends AccFunc<Long> {
+        public AccLongCountWithConstructor(String mult) {
+            super(new AlgLongCountWithConstructor(mult));
+        }
+    }
+
+    public static class BasicLongCountWithConstructor extends BasicFunc<Long> {
+        public BasicLongCountWithConstructor(String mult) {
+            super(new AlgLongCountWithConstructor(mult));
+        }
+    }
+
+    //this exists to make sure that constructors work properly with AlgebraicEvalFunc
+    public static class AlgLongCountWithConstructor extends AlgebraicEvalFunc<Long> {
+        //this will inflate the count by the given factor
+        public AlgLongCountWithConstructor(String mult) {
+            super(mult);
+        }
+
+        public static class Initial extends COUNT.Initial {
+            long mult=1;
+            public Initial() {}
+
+            public Initial(String mult) {
+                this.mult = Long.parseLong(mult);
+            }
+
+            public Tuple exec(Tuple input) throws IOException {
+                Tuple t = super.exec(input);
+                t.set(0,((Long)t.get(0))*mult);
+                return t;
+            }
+        }
+
+        public static class Intermed extends COUNT.Intermediate {
+            public Intermed() {}
+            public Intermed(String mult) {}
+        }
+
+        public static class Final extends COUNT.Final {
+            public Final() {}
+            public Final(String mult) {}
+        }
+
+        @Override
+        public String getInitial() {
+            return Initial.class.getName();
+        }
+
+        @Override
+        public String getIntermed() {
+            return Intermed.class.getName();
+        }
+
+        @Override
+        public String getFinal() {
+            return Final.class.getName();
+        }
+    }
+}