You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/03 09:56:02 UTC

svn commit: r1308736 [1/3] - in /pig/branches/branch-0.10: ./ bin/ ivy/ src/main/ src/main/jruby/ src/org/apache/pig/ src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/jruby/ src/org/apache/pig/tools/counters/ tes...

Author: daijy
Date: Tue Apr  3 07:56:01 2012
New Revision: 1308736

URL: http://svn.apache.org/viewvc?rev=1308736&view=rev
Log:
PIG-2317: Ruby/Jruby UDFs

Added:
    pig/branches/branch-0.10/src/main/
    pig/branches/branch-0.10/src/main/jruby/
    pig/branches/branch-0.10/src/main/jruby/pigudf.rb
    pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/PigJrubyLibrary.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubyDataBag.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubyDataByteArray.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubySchema.java
    pig/branches/branch-0.10/src/org/apache/pig/tools/counters/
    pig/branches/branch-0.10/src/org/apache/pig/tools/counters/PigCounterHelper.java
    pig/branches/branch-0.10/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java
    pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/
    pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/morerubyudfs.rb
    pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/scriptingudfs.rb
    pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/udf2.rb
    pig/branches/branch-0.10/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java
Modified:
    pig/branches/branch-0.10/CHANGES.txt
    pig/branches/branch-0.10/bin/pig
    pig/branches/branch-0.10/build.xml
    pig/branches/branch-0.10/ivy.xml
    pig/branches/branch-0.10/ivy/libraries.properties
    pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java
    pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java
    pig/branches/branch-0.10/test/e2e/pig/build.xml
    pig/branches/branch-0.10/test/e2e/pig/conf/default.conf
    pig/branches/branch-0.10/test/e2e/pig/conf/local.conf
    pig/branches/branch-0.10/test/e2e/pig/drivers/TestDriverPig.pm
    pig/branches/branch-0.10/test/e2e/pig/drivers/Util.pm
    pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf
    pig/branches/branch-0.10/test/org/apache/pig/test/TestUDF.java

Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Tue Apr  3 07:56:01 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2317: Ruby/Jruby UDFs (jcoveney via daijy)
+
 PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false (andy lindeman via jcoveney)
 
 PIG-1270: Push limit into loader (daijy)

Modified: pig/branches/branch-0.10/bin/pig
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/bin/pig?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/bin/pig (original)
+++ pig/branches/branch-0.10/bin/pig Tue Apr  3 07:56:01 2012
@@ -135,6 +135,15 @@ if [ -z "$JYTHON_JAR" ]; then
     fi
 fi
 
+JRUBY_JAR=`echo ${PIG_HOME}/lib/jruby-complete-*.jar`
+
+if [ -z "$JRUBY_JAR" ]; then
+    JRUBY_JAR=`echo $PIG_HOME/build/ivy/lib/Pig/jruby-complete-*.jar`
+    if [ -n "$JRUBY_JAR" ]; then
+        CLASSPATH=${CLASSPATH}:$JRUBY_JAR
+    fi
+fi
+
 for f in $PIG_HOME/share/pig/lib/*.jar; do
     CLASSPATH=${CLASSPATH}:$f;
 done

Modified: pig/branches/branch-0.10/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/build.xml?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/build.xml (original)
+++ pig/branches/branch-0.10/build.xml Tue Apr  3 07:56:01 2012
@@ -589,6 +589,7 @@
                 </section>
             </manifest>
             <fileset file="${basedir}/conf/pig-default.properties" />
+            <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
         </jar>
         <!-- @depricated -->
         <jar jarfile="${output.jarfile}" basedir="${build.classes}">
@@ -623,6 +624,7 @@
                 <include name="zookeeper*.jar"/>
             </zipgroupfileset>
             <fileset file="${basedir}/conf/pig-default.properties" />
+            <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
         </jar>
         <antcall target="include-meta" inheritRefs="true" inheritall="true"/>
         <copy file="${output.jarfile}" tofile="${output.jarfile.backcompat}"/>
@@ -680,6 +682,7 @@
                 </section>
             </manifest>
             <fileset file="${basedir}/conf/pig-default.properties" />
+            <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
         </jar>
         <!-- @depricated -->
         <jar jarfile="${output.jarfile.withouthadoop}" basedir="${build.classes}">
@@ -703,6 +706,7 @@
                 <include name="guava-${guava.version}.jar" />
             </zipgroupfileset>
             <fileset file="${basedir}/conf/pig-default.properties" />
+            <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
         </jar>
         <copy file="${output.jarfile.withouthadoop}" tofile="${output.jarfile.backcompat.withouthadoop}"/>
     </target>

Modified: pig/branches/branch-0.10/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/ivy.xml?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/ivy.xml (original)
+++ pig/branches/branch-0.10/ivy.xml Tue Apr  3 07:56:01 2012
@@ -195,6 +195,8 @@
     <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="compile->master"/>
     <dependency org="org.jboss.netty" name="netty" rev="3.2.2.Final" conf="compile->master"/>
 
+    <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
+
     <dependency org="org.apache.hbase" name="hbase" rev="${hbase.version}" conf="compile->master">
       <artifact name="hbase" type="jar"/>
       <artifact name="hbase" type="test-jar" ext="jar" m:classifier="tests"/>

Modified: pig/branches/branch-0.10/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/ivy/libraries.properties?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/ivy/libraries.properties (original)
+++ pig/branches/branch-0.10/ivy/libraries.properties Tue Apr  3 07:56:01 2012
@@ -52,6 +52,7 @@ joda-time.version=1.6
 jsch.version=0.1.38
 json-simple.version=1.1
 junit.version=4.5
+jruby.version=1.6.7
 jython.version=2.5.0
 rhino.version=1.7R2
 antlr.version=3.4

Added: pig/branches/branch-0.10/src/main/jruby/pigudf.rb
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/main/jruby/pigudf.rb?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/main/jruby/pigudf.rb (added)
+++ pig/branches/branch-0.10/src/main/jruby/pigudf.rb Tue Apr  3 07:56:01 2012
@@ -0,0 +1,421 @@
+throw "pigudf.rb only works under JRuby!" unless RUBY_PLATFORM=="java"
+require 'jruby'
+org.apache.pig.scripting.jruby.PigJrubyLibrary.new.load(JRuby.runtime, false)
+
+#TODO output_schema should accept a Schema object as well, and use Schema objects
+#TODO AccumulatorEvalFunc output_schema should not allow you to give a block and defined it
+
+# This is the base class for runy of the mill EvalFuncs. A class just serves to
+# contain similar jobs, as well as allow for method reuse. In the case of simple
+# EvalFuncs, each method will be turned into a UDF (though they do not have to be called).
+#
+# TODO: EXPLAIN SYNTAX
+
+class PigUdf
+
+  # Here we initialize the variables we'll be using at the class level (generally
+  # analogous to static in Java). The nice thing about this method is that these
+  # values are all set in the same way, even from children. Thus, all of the children
+  # will update PigUdf.@@functions_to_register, and on the Java side when we want to
+  # access this, we can return that Map. This means it is no longer necessary to keep
+  # track of descendent children, etc, since all that matters are the methods that
+  # are registered with subclasses.
+  #
+  # The @@class_object_to_name_and_add variable is used by self.evalfunc and self.filterfunc.
+  # See the documentation for the former to understand why it is necessary. @@schema holds
+  # the last schema given by output_schema or output_schema_function, see the documentation
+  # on output_schema for more.
+
+  @@functions_to_register = {}
+  @@class_object_to_name_and_add = nil
+  @@schema = nil
+
+  # See the documentation on self.evalfunc for why this is necessary. This takes the current class
+  # object and registers it. This is necessary because self.evalfunc has to return before .to_s
+  # will return something meaningful and not gibberish.
+
+  def self.name_and_add_class_object
+    if @@class_object_to_name_and_add
+      name = @@class_object_to_name_and_add.class_object.to_s
+      @@class_object_to_name_and_add.method_name = "eval"
+      @@functions_to_register[name] = @@class_object_to_name_and_add
+    end
+    @@class_object_to_name_and_add = nil
+  end
+
+  # This is the core function that registers a method as a UDF. The pig_func_name
+  # identifies it, and in most cases, is the method name (the exception begin
+  # UDFs created using self.evalfunc). The class_object is the class against an instance
+  # of which the method will be called. The arity is so Pig knows how many arguments
+  # to pass to the UDF, and the output_schema defines the Schema of the output, either
+  # as a string, or as a function.
+
+  def self.register_function pig_func_name, class_object, arity, output_schema
+    self.name_and_add_class_object
+
+    pig_func_name = pig_func_name.to_s
+
+    reg = EvalFunc.new class_object, pig_func_name, arity, output_schema
+
+    @@functions_to_register[pig_func_name] = reg
+  end
+
+  def self.set_class_object_to_name_and_add func
+    self.name_and_add_class_object
+    @@class_object_to_name_and_add = func
+  end
+
+  # This method provides the most succinct way to define a UDF. The syntax is as follows:
+  #
+  # UdfName = PigUdf.evalfunc('int') do |arg1|
+  #   return arg.length
+  # end
+  #
+  # EvalFunc takes one parameter, the schema to be returned, and a block which will represent
+  # the method call.
+  #
+  # In the case that this will be used, then it will be one class with one function,
+  # and the function name will be UdfName. It is essential that UdfName begin with
+  # a capital letter, as this method uses a hook given to ruby where Name = Class.new
+  # will generate a class of name Name, but only if Name begins with a capital letter.
+  #
+  # The reason for naming the function "GETCLASSFROMOBJECT" is that the class object must first
+  # be returned for its name to be available. Asking it for its name before allowing "evalfunc"
+  # to return will not yield the name it is given. Thus, we plant "GETCLASSFROMOBJECT" so the next
+  # time we access @functions_to_register, we know to check.
+
+  def self.evalfunc output_schema, &blk
+    c=Class.new do
+      define_method :eval do |*args|
+        blk.call(*args)
+      end
+    end
+    self.set_class_object_to_name_and_add EvalFunc.new c, "GETFROMCLASSOBJECT", blk.arity, output_schema
+    c
+  end
+
+  # This method functions identically to evalfunc above, the only difference being that no schema
+  # needs to be given.
+
+  def self.filterfunc &blk
+    c=Class.new do
+      define_method :eval do |*args|
+        blk.call(*args)
+      end
+    end
+    self.set_class_object_to_name_and_add EvalFunc.new c, "GETFROMCLASSOBJECT", blk.arity, Schema.boolean
+    c
+  end
+
+  # This is the function which register the schema associated with a given function. There are
+  # two ways that it can be invoked, with one argument or two (thus the vague argument names).
+  #
+  # case 1: one argument
+  # In this case, output_schema's argument is the schema to be set for the next method declaration.
+  # For example:
+  # output_schema "long"
+  #
+  # The above would mean that the schema for the function following it would be set to long. The mechanism
+  # by which this is achieved is by setting a class schema variable to the schema, and the next time
+  # a method is declared in the class, the class uses the schema that was set to register the function being
+  # declared. For more information on that, see self.method_added, as this is the Ruby provided hook
+  # that is used to allow this disconnect between declaring a schema and the method declaration that follows.
+  #
+  # case 2: two arguments
+  # In this case, arg1 is the name of the function whose schema we want to set, and arg2 is
+  # the schema, ie
+  #
+  # output_schema :sum, "long"
+  #
+  # You can only use this after the function is declared, otherwise there will be an error.
+  # In this case, the information passed to the registration function is the function name,
+  # an instance of the class (so that on the Java side we can instantiate a version), the arity,
+  # and the schema. For more information on how that information is used, see self.register_function.
+  #
+  # The following two uses are identical:
+  #
+  # use 1:
+  # output_schema "long"
+  # def sum x, y
+  #   return x + y
+  # end
+  #
+  # use 2:
+  # def sum x,y
+  #   return x + y
+  # end
+  # output_schema :sum, "long"
+
+  def self.output_schema arg1, arg2=nil
+    if arg2
+      function_name = arg1.to_s
+      schema = arg2.to_s
+      self.register_function function_name, self, function_name, schema
+    else
+      @@schema = arg1
+    end
+  end
+
+  # This function acts identically to output_schema, except that it is not necessary to provide a schema string
+  # because a filter func will always have a set schema (it will return boolean).
+
+  def self.filter_func arg1=nil
+    schema = "FILTERFUNC"
+    if arg1
+      function_name = arg1.to_s
+      self.output_schema function_name, schema
+    else
+      self.output_schema schema
+    end
+  end
+
+  # output_schema is only useful when the function at hand has a deterministic schema. In the case that the schema
+  # needs to be dynamic, it is useful to be able to process the input schema with a function and return the appropriate
+  # output schema. An example of this might be a concat function, which takes two values and concatenates them together.
+  # This function could work for chararrays, but also for bytearrays. In that case, the output schema depends on the input schema.
+  #
+  # As with output_schema, there are two cases, and they are identical (see output_schema for a more detailed explanation).
+  # The difference, however, is that instead of passing a string ie "long", the user gives a function name. Note: the schema
+  # function does not yet have to be defined. In the case of two arguments, the same information is passed to register_function
+  # as in the case of output_schema, the difference being that while the schema is passed as a string, it has an identifier
+  # appended to it so that when this function is running in Java, we'll know that we should be using a function.
+
+  def self.output_schema_function arg1, arg2=nil #TODO allow it to also accept a block, as in ComplexPigUdf
+    schema_func = (arg2||arg1).to_sym
+    if arg2
+      function_name = arg1.to_s
+      self.register_function function_name, self, function_name, schema_func.to_sym
+    else
+      @@schema = arg1.to_sym
+    end
+  end
+
+  # Javaists love their camelCase
+  class << self
+    alias :outputSchema :output_schema
+    alias :filterFunc :filter_func
+    alias :outputSchemaFunction :output_schema_function
+  end
+
+  # This is a hook that Ruby provides that is called whenever a method is declared on the subclass.
+  # This is used so that we have visibility on the methods as they are declared, which is useful because
+  # every declared method will be registered as a UDF for use in Pig. In the case of a method that doesn't
+  # yet have a schema declared, it's return type will just be a bytearray, as in Pig.
+
+  def self.method_added function_name
+    if @@schema
+      self.register_function function_name, self, function_name, @@schema
+    elsif !@@functions_to_register[function_name]
+      self.register_function function_name, self, function_name, nil
+    end
+    @@schema = nil
+  end
+
+  # This returns the map that maintains the Function classes that have information on declared methods.
+
+  def self.get_functions_to_register
+    self.name_and_add_class_object
+
+    @@functions_to_register
+  end
+
+  # The Function class privates a convenient wrapper to store information about EvalFuncs, separating
+  # out the methods that will be used on the frontend to get information on the method registered.
+
+  class Function
+    attr_accessor :method_name
+    attr_reader :arity, :class_object
+
+    def initialize class_object, method_name, arity
+      @class_object = class_object
+      @method_name = method_name
+      @arity = arity
+    end
+
+    def required_args
+      if @arity.is_a? Numeric
+        @arity
+      else
+        @class_object.instance_method(@arity.to_sym).parameters.count {|x,y| x==:req}
+      end
+    end
+
+    def optional_args
+      if @arity.is_a? Numeric
+        0
+      else
+        params = @class_object.instance_method(@arity.to_sym).parameters
+        return -1 if params.any? {|x,y| x==:rest}
+        params.count {|x,y| x==:opt}
+      end
+    end
+
+    # This conveniently gives an instance of the class this Function wraps, so that on the Java end
+    # it is trivial to get the object against which method calls can be made.
+
+    def get_receiver
+      @class_object.new
+    end
+
+    # This is useful for identifying the subclass Java is dealing with (EvalFunc, FilterFunc, etc)
+
+    def name
+      return self.class.to_s
+    end
+  end
+
+  class EvalFunc < Function
+    def initialize class_object, method_name, arity, schema_or_func
+      super class_object, method_name, arity
+      @schema_or_func = schema_or_func
+    end
+
+    # This is the function that will be used from Java to get the proper schema of the output.
+    # Given that users have two options, output_schema or output_schema_function, this method
+    # detects which and acts appropriately. It must be given an instance of the EvalFunc (generally
+    # the result of "get_receiver") in the case of an output_schema_function so that it can evaluate
+    # the output Schema based on the input Schema.
+
+    def schema input_schema, class_instance
+      if !@schema_or_func
+         return Schema.bytearray
+      elsif @schema_or_func.is_a? String
+         return Schema.new @schema_or_func
+      elsif @schema_or_func.is_a? Schema
+         return @schema_or_func
+      else
+         func = @schema_or_func
+         func = @class_object.instance_method(func) if func.is_a? Symbol
+         return func.bind(class_instance).call input_schema
+      end
+    end
+  end
+end
+
+# This is the base class used for Algebraic and Accumulator functions. The reason for the different
+# implementation is because there is more structure in these cases. In the case of general EvalFuncs,
+# a method is equivalent to a UDF. In the case of Algebraic and Accumulator UDFs, however, a class is
+# equivalent to a UDF. Thus, instead of keeping track of methods added, we keep track of classes
+# that extend our Algebraic and Accumulator UDF base classes.
+
+class ComplexUdfBase
+  # As with the basic PigUdf, there is a class method "output_schema" which defines the schema for the class.
+  # This method can be called anywhere (as there is not the issue of multiple UDFs to worry about). If it is not
+  # called, it will have return type bytearray.
+
+  def self.output_schema schema
+    @schema = schema
+  end
+
+  class << self
+    alias :outputSchema :output_schema
+  end
+
+  # This returns the schema, or in the case that one was not supplied, a Schema of bytearray.
+
+  def self.get_output_schema
+    Schema.new(@schema||Schema.bytearray)
+  end
+
+  # Since a class = a UDF, in this case it makes sense to traverse the tree of decendant classes
+  # in order to pull all of the registered classes. It's important to note
+
+  def self.classes_to_register
+    classes = {}
+    ObjectSpace.each_object(Class) do |c|
+      classes[c.to_s] = c if c.ancestors.include?(self) and (c != self)
+    end
+    classes
+  end
+
+  # This is a method that can be used by Pig to ensure that all of the necessary methods are present, so that
+  # the function will throw an error on parsing instead of on execution. This is a shell implementation
+  # to ensure that necessary_methods is called by a subclass, which will then generate the proper implementation.
+
+  def self.check_if_necessary_methods_present
+    throw "Need to declare the methods that should be present"
+  end
+
+  # This is a method that, if called at the class level, defines a set of methods that must be called
+  # by any child classes (ie UDFs).
+
+  def self.necessary_methods *m
+    self.instance_eval "def self.check_if_necessary_methods_present; #{Array(m).inspect}.all? { |m| self.method_defined? m }; end"
+  end
+end
+
+# This is the class that any Accumulator UDF must extend. The necessary_methods call ensures that all
+# child classes have the necessary methods implemented. AccumulatorPigUdfs support dynamic output_schema.
+# To do so, register a block with the schema function, as so:
+# output_schema do |input|
+#  return input
+# end
+#
+# In the case of a non-dyanamic output schema, it's possible to stil just set output_schema "long".
+#
+# an example of an accumulator UDF is:
+#
+# class SUM < AccumulatorPigUdf
+#   output_schema "long"
+#
+#   def exec input
+#     @res ||= 0
+#     input.flatten.inject(:+)
+#   end
+#   def get
+#     @res
+#   end
+# end
+
+class AccumulatorPigUdf < ComplexUdfBase
+  def self.output_schema schema=nil, &blk
+    if block_given?
+      throw "Can specify block or schema but not both!" if schema
+      throw "Block must accept one argument!" if blk.arity != 1
+      @schema = blk
+    else
+      @schema = schema
+    end
+  end
+
+  class << self
+    alias :outputSchema :output_schema
+  end
+
+  def self.get_output_schema input_schema=nil
+    if input_schema && @schema.class == Proc
+      @schema.call input_schema
+    else
+      Schema.new(@schema||Schema.bytearray)
+    end
+  end
+
+  necessary_methods :exec, :get
+end
+
+# This is the class that any Accumulator UDF must extend. The necessary_methods call ensures that all
+# child classes have the necessary methods implemented.
+#
+# an example of an Algebraic UDF is:
+#
+# class Count < AlgebraicPigUdf
+#   output_schema "long"
+#
+#   def initial t
+#     t.nil? ? 0 : 1
+#   end
+#
+#   def intermed t
+#     return 0 if t.nil?
+#     return t.flatten.inject(:+)
+#   end
+#
+#   def final t
+#     return intermed(t)
+#   end
+# end
+
+
+class AlgebraicPigUdf < ComplexUdfBase
+  necessary_methods :initial, :intermed, :final
+end

Added: pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This class is used to provide a free implementation of the EvalFunc exec function given
+ * implementation of the Accumulator interface. Instead of having to provide a redundant
+ * implementation, this provides the base exec function for free, given that the methods
+ * associated with the Accumulator interface are implemented. For information on how to
+ * implement Accumulator, see {@link Accumulator}.
+ */
+public abstract class AccumulatorEvalFunc<T> extends EvalFunc<T> implements Accumulator<T> {
+
+    @Override
+    public abstract void accumulate(Tuple b) throws IOException;
+
+    @Override
+    public abstract void cleanup();
+
+    @Override
+    public abstract T getValue();
+
+    @Override
+    public T exec(Tuple input) throws IOException {
+        accumulate(input);
+        T result = getValue();
+        cleanup();
+        return result;
+    }
+
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.counters.PigCounterHelper;
+
+/**
+ * This class is used to provide a free implementation of the Accumulator interface
+ * and EvalFunc class in the case of an Algebraic function. Instead of having to provide
+ * redundant implementations for Accumulator and EvalFunc, implementing the
+ * getInitial, getIntermed, and getFinal methods (which implies implementing the static classes
+ * they reference) will give you an implementation of each of those for free. <br><br>
+ * One key thing to note is that if a subclass of AlgebraicEvalFunc wishes to use any constructor
+ * arguments, it MUST call super(args).
+ * <br><br>
+ * IMPORTANT: the implementation of the Accumulator interface that this class provides is good,
+ * but it is simulated. For maximum efficiency, it is important to manually implement the accumulator
+ * interface. See {@link Accumulator} for more information on how to do so.
+ */
+public abstract class AlgebraicEvalFunc<T> extends AccumulatorEvalFunc<T> implements Algebraic {
+    private EvalFunc<Tuple> initEvalFunc;
+    private EvalFunc<Tuple> intermedEvalFunc;
+    private EvalFunc<T> finalEvalFunc;
+
+    private static final BagFactory mBagFactory = BagFactory.getInstance();
+    private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    private DataBag intermediateDB;
+    private DataBag wrapDB;
+    private DataBag accumDB;
+
+    private Tuple argTuple;
+    private Tuple accumTup;
+
+    /**
+     * This represents the  number of elements an intermediate bag must have
+     * in order for the intermediate EvalFunc to be used on it.
+     */
+    private int bagCombineThreshold = 1000;
+
+    /**
+     * This represents the factor by which the result of applying the intermediate
+     * EvalFunc must shrink the data in order for combining at this stage to be
+     * deemed "worth it."
+     */
+    private int combineFactor = 2;
+
+    private PigCounterHelper pigCounterHelper = new PigCounterHelper();
+
+    private boolean combine;
+
+    private String[] constructorArgs;
+
+    /**
+     * It is key that if a subclass has a constructor, that it calls super(args...) or else
+     * this class will not instantiate properly.
+     */
+    public AlgebraicEvalFunc(String... constructorArgs) { this.constructorArgs = constructorArgs; }
+
+    /**
+     * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+     * more information.
+     */
+    @Override
+    public abstract String getFinal();
+
+    /**
+     * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+     * more information.
+     */
+    @Override
+    public abstract String getInitial();
+
+    /**
+     * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+     * more information.
+     */
+    @Override
+    public abstract String getIntermed();
+
+    private boolean init = false;
+
+    /**
+     * This helper function instantiates an EvalFunc given its String class name.
+     */
+    private EvalFunc<?> makeEvalFunc(String base) {
+        StringBuffer sb = new StringBuffer();
+        sb.append(base).append("(");
+
+        boolean first = true;
+        for (String s : constructorArgs) {
+            if (!first) sb.append(",");
+            else first = false;
+            sb.append("'").append(s).append("'");
+        }
+
+        sb.append(")");
+
+        return (EvalFunc<?>)PigContext.instantiateFuncFromSpec(sb.toString());
+    }
+
+    /**
+     * This is the free accumulate implementation based on the static classes provided
+     * by the Algebraic static classes. This implemention works by leveraging the
+     * initial, intermediate, and final classes provided by the algebraic interface.
+     * The exec function of the Initial EvalFunc will be called on every Tuple of the input
+     * and the output will be collected in an intermediate state. Periodically, this intermediate
+     * state will have the Intermediate EvalFunc called on it 1 or more times. The Final EvalFunc
+     * is not called until getValue() is called.
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void accumulate(Tuple input) throws IOException {
+        if (!init) {
+            intermediateDB = mBagFactory.newDefaultBag();
+            wrapDB = mBagFactory.newDefaultBag();
+            accumDB = mBagFactory.newDefaultBag();
+
+            argTuple = mTupleFactory.newTuple(1);
+            argTuple.set(0, wrapDB);
+
+            accumTup = mTupleFactory.newTuple(1);
+            accumTup.set(0,accumDB);
+
+            initEvalFunc = (EvalFunc<Tuple>)makeEvalFunc(getInitial());
+            intermedEvalFunc = (EvalFunc<Tuple>)makeEvalFunc(getIntermed());
+            finalEvalFunc = (EvalFunc<T>)makeEvalFunc(getFinal());
+
+            combine = true;
+            init = true;
+        }
+
+        accumDB.clear();
+
+        for (Tuple t : (DataBag)input.get(0)) {
+            wrapDB.clear();
+            wrapDB.add(t);
+            accumDB.add(initEvalFunc.exec(argTuple));
+        }
+
+        intermediateDB.add(intermedEvalFunc.exec(accumTup));
+        if (combine && intermediateDB.size() > bagCombineThreshold) {
+           long initialSizeEstimate = intermediateDB.getMemorySize();
+           DataBag newIntermediateDB = mBagFactory.newDefaultBag();
+           Tuple t = mTupleFactory.newTuple(1);
+           t.set(0, intermediateDB);
+           newIntermediateDB.add(intermedEvalFunc.exec(t));
+           intermediateDB = newIntermediateDB;
+           long newSizeEstimate = intermediateDB.getMemorySize();
+           pigCounterHelper.incrCounter("AlgebraicEvalFunc", "InitialSizeEst", initialSizeEstimate);
+           pigCounterHelper.incrCounter("AlgebraicEvalFunc", "PostCombineSizeEst", newSizeEstimate);
+           pigCounterHelper.incrCounter("AlgebraicEvalFunc", "CombineApply", 1L);
+           if (combineFactor * newSizeEstimate > initialSizeEstimate) {
+             combine = false;
+             pigCounterHelper.incrCounter("AlgebraicEvalFunc", "CombineShutoff", 1L);
+           }
+        }
+    }
+
+    /**
+     * Per the Accumulator interface, this clears all of the variables used in the implementation.
+     */
+    @Override
+    public void cleanup() {
+        intermediateDB = null;
+        wrapDB = null;
+        accumDB = null;
+
+        argTuple = null;
+        accumTup = null;
+
+        initEvalFunc = null;
+        intermedEvalFunc = null;
+        finalEvalFunc = null;
+
+        init = false;
+    }
+
+    /**
+     * This function returns the ultimate result. It is when getValue() is called that
+     * the Final EvalFunc's exec function is called on the accumulated data.
+     */
+    @Override
+    public T getValue() {
+        try {
+            return finalEvalFunc.exec(mTupleFactory.newTuple(intermediateDB));
+        } catch (IOException e) {
+            throw new RuntimeException("Error in AlgebraicEvalFunc evaluating final method");
+        }
+    }
+
+}

Modified: pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java Tue Apr  3 07:56:01 2012
@@ -126,10 +126,28 @@ public class JarManager {
             mergeJar(jarFile, extraJar, null, contents);
         }
         for (String path: pigContext.scriptFiles) {
-        	addStream(jarFile, path, new FileInputStream(new File(path)),contents);
+            InputStream stream = null;
+            if (new File(path).exists()) {
+                stream = new FileInputStream(new File(path));
+            } else {
+                stream = PigContext.getClassLoader().getResourceAsStream(path);
+            }
+            if (stream==null) {
+                throw new IOException("Cannot find " + path);
+            }
+            addStream(jarFile, path, stream, contents);
         }
         for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
-        	addStream(jarFile, entry.getKey(), new FileInputStream(entry.getValue()),contents);
+            InputStream stream = null;
+            if (entry.getValue().exists()) {
+                stream = new FileInputStream(entry.getValue());
+            } else {
+                stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+            }
+            if (stream==null) {
+                throw new IOException("Cannot find " + entry.getValue().getPath());
+            }
+            addStream(jarFile, entry.getKey(), stream, contents);
         }
         
         jarFile.putNextEntry(new ZipEntry("pigContext"));

Modified: pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java Tue Apr  3 07:56:01 2012
@@ -47,8 +47,7 @@ public abstract class ScriptEngine {
     public static enum SupportedScriptLang {
 
         // possibly jruby in the future
-        //jruby(new String[]{}, new String[]{}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
-        
+        jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
         jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"), 
         javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine");
         
@@ -122,7 +121,8 @@ public abstract class ScriptEngine {
      * @return a stream (it is the responsibility of the caller to close it)
      * @throws IllegalStateException if we could not open a stream
      */
-    protected static InputStream getScriptAsStream(String scriptPath) {
+    public static InputStream getScriptAsStream(String scriptPath) {
+    //protected static InputStream getScriptAsStream(String scriptPath) {
         InputStream is = null;
         File file = new File(scriptPath);
         if (file.exists()) {

Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.scripting.jruby.JrubyScriptEngine.RubyFunctions;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.jruby.Ruby;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class provides a bridge between Ruby classes that extend AccumulatorPigUdf
+ * and their execution in Pig. This class passes a Bag of data to the Ruby "exec"
+ * function, and ultimate gets the value by calling "get" on the class instance
+ * that receives methods.
+ */
+public class JrubyAccumulatorEvalFunc extends AccumulatorEvalFunc<Object> {
+    private Object methodReceiver;
+    private Object classObject;
+    private boolean isInitialized = false;
+    private String path;
+    private String methodName;
+
+    private static final ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+    private static final Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+    // It is meaningless to instantiate this class without a path and a method
+    private JrubyAccumulatorEvalFunc() {}
+
+    /**
+     * This constructor is used by JrubyScriptEngine to register a Ruby class as an Accumulator.
+     * The path and methodName are used to find the ruby Class, which is then instantated and used.
+     */
+    public JrubyAccumulatorEvalFunc(String path, String methodName) {
+        this.path = path;
+        this.methodName = methodName;
+    }
+
+    /**
+     * This function intializes the object that receives method calls, and the class object that
+     * has schema information. While this is only 3 lines, it is split out so that the schema
+     * function can initialize it if necessary. The class object is pulled from the ruby script
+     * registered at the path per the RubyFunctions helper methods.
+     */
+    private void initialize() {
+        classObject = RubyFunctions.getFunctions("accumulator", path).get(methodName);
+        methodReceiver = rubyEngine.callMethod(classObject, "new");
+        isInitialized = true;
+    }
+
+    /**
+     * This uses the "exec" method required of AccumulatorPigUdf Ruby classes. It streams the data bags
+     * it receives through the exec method defined on the registered class.
+     */
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        if (!isInitialized)
+            initialize();
+        RubyDataBag db = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)b.get(0));
+        rubyEngine.callMethod(methodReceiver, "exec", db, IRubyObject.class);
+    }
+
+    @Override
+    public void cleanup() {
+        isInitialized = false;
+        methodReceiver = null;
+    }
+
+   /**
+    * This method calls "get" on the AccumulatorPigUdf Ruby class that was specified.
+    */
+    @Override
+    public Object getValue()  {
+        IRubyObject rubyResult = rubyEngine.callMethod(methodReceiver, "get", IRubyObject.class);
+        try {
+            return PigJrubyLibrary.rubyToPig(rubyResult);
+        } catch (ExecException e) {
+            throw new RuntimeException("Unable to convert result from Ruby to Pig: " + rubyResult, e);
+        }
+    }
+
+    /**
+     * This provides the Schema of the output, and leverages the get_output_schema function on the class object
+     * that is defined on the ruby side.
+     */
+    @Override
+    public Schema outputSchema(Schema input) {
+        if (!isInitialized)
+            initialize();
+        RubySchema rs = PigJrubyLibrary.pigToRuby(ruby, input);
+        return PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(classObject, "get_output_schema", rs, RubySchema.class));
+    }
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.scripting.jruby.JrubyScriptEngine.RubyFunctions;
+
+import org.jruby.Ruby;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class provides the bridge between Ruby classes that extend the AlgebraicPigUdf
+ * "interface" by implementing an initial, intermed, and final method. Unlike EvalFuncs
+ * and Accumulators, the type must be known at compile time (ie it can't return Object),
+ * as Pig inspects the type and ensures that it is valid. This is why class specific
+ * shells are provided at the bottom. This class leverages AlgebraicEvalFunc to provide
+ * the Accumulator and EvalFunc implementations.
+ */
+public abstract class JrubyAlgebraicEvalFunc<T> extends AlgebraicEvalFunc<T> {
+    protected static BagFactory mBagFactory = BagFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    protected static final ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+    protected static final Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+    // It makes no sense to instantiate this without arguments
+    private JrubyAlgebraicEvalFunc() {}
+
+    /**
+     * The constructor takes information on the script and method being invoked and registers it with the
+     * superclass (which is necessary for AlgebraicEvalFunc).
+     */
+    public JrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName, functionName); }
+
+    /**
+     * This class invokes the initial method on the given Ruby class. As a courtesy to the user, it unwraps the
+     * DataBag that is given to Initial and passes Ruby just the first Tuple that it contains, as the contract
+     * for the Initial function in Algebraic is that it is given a DataBag with one and only one Tuple. Finally,
+     * it wraps the Ruby output in a Tuple.
+     */
+    public static class Initial extends AlgebraicFunctionWrapper<Tuple> {
+        public Initial() {}
+
+        public Initial(String fileName, String functionName) { super(fileName, functionName, "initial"); }
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            if (!isInitialized())
+                initialize();
+
+            try {
+                IRubyObject inp = PigJrubyLibrary.pigToRuby(ruby, ((DataBag)input.get(0)).iterator().next().get(0));
+                IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+                return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
+            } catch (Exception e) {
+                throw new IOException("Error executing initial function",  e);
+            }
+        }
+    }
+
+    /**
+     * This class invokes the intermed method on the given Ruby class. It passes along the DataBag contained
+     * in the Tuple it is given, and wraps the Ruby output in a Tuple.
+     */
+    public static class Intermed extends AlgebraicFunctionWrapper<Tuple> {
+        public Intermed() {}
+
+        public Intermed(String fileName, String functionName) { super(fileName, functionName, "intermed"); }
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            if (!isInitialized())
+                initialize();
+
+            try {
+                RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
+                IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+                return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
+            } catch (Exception e) {
+                throw new IOException("Error executing intermediate function: ",  e);
+            }
+        }
+    }
+
+    /**
+     * This class invokes the final method on the given Ruby class. It passes along the DataBag contained
+     * in the Tuple it is given, and the raw result.
+     */
+    public static class Final<T> extends AlgebraicFunctionWrapper<T> {
+        public Final() {}
+
+        public Final(String fileName, String functionName) { super(fileName, functionName, "final"); }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T exec(Tuple input) throws IOException {
+            if (!isInitialized())
+                initialize();
+
+            try {
+                RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
+                IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+                return (T)PigJrubyLibrary.rubyToPig(rubyResult);
+            } catch (Exception e) {
+                throw new IOException("Error executing final function",  e);
+            }
+        }
+    }
+
+    /**
+     * This is a lightweight wrapper shell that registers information on the method being called,
+     * and provides the initializer that the static Algebraic classes (Initial, Intermed, Final)
+     * will use to execute.
+     */
+    public static abstract class AlgebraicFunctionWrapper<T> extends EvalFunc<T> {
+        private String fileName;
+        private String functionName;
+
+        protected Object receiver;
+        protected boolean isInitialized = false;
+
+        protected String stage;
+
+        public String getStage() { return stage; }
+        public Object getReceiver() { return receiver; }
+        public String getFileName() { return fileName; }
+        public String getFunctionName() { return functionName; }
+
+        public AlgebraicFunctionWrapper() {}
+
+        /**
+         * In addition to registering the fileName and the functionName (which are given based on the
+         * arguments passed to super() in the containing class's constructor, each extending class
+         * will register their "stage," which will serve as the method to invoke on the Ruby class.
+         */
+        public AlgebraicFunctionWrapper(String fileName, String functionName, String stage) {
+            this.fileName = fileName;
+            this.functionName = functionName;
+            this.stage = stage;
+        }
+
+        public boolean isInitialized() { return isInitialized; }
+
+        public void initialize() {
+            receiver = rubyEngine.callMethod(RubyFunctions.getFunctions("algebraic", fileName).get(functionName), "new");
+            isInitialized = true;
+        }
+
+        @Override
+        public abstract T exec(Tuple input) throws IOException;
+    }
+
+    @Override
+    public abstract String getFinal();
+
+    @Override
+    public String getInitial() { return Initial.class.getName(); }
+
+    @Override
+    public String getIntermed() { return Intermed.class.getName(); }
+
+    /**
+     * Unlike EvalFuncs and Accumulators, the type must be known at compile time (ie it
+     * can't return Object), as Pig inspects the type and ensures that it is valid. This
+     * is why class specific shells are provided here. This is also the reason why the
+     * Ruby Algebraic interface is the only interface that does not currently allow overriding
+     * outputSchema, and a fixed one must be provided.
+     */
+    public static class BagJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataBag> {
+        public BagJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<DataBag> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class ChararrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<String> {
+        public ChararrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<String> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class DataByteArrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataByteArray> {
+        public DataByteArrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<DataByteArray> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class DoubleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Double> {
+        public DoubleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Double> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class FloatJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Float> {
+        public FloatJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Float> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class IntegerJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Integer> {
+        public IntegerJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Integer> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class LongJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Long> {
+
+        public LongJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Long> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class MapJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Map<?,?>> {
+        public MapJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Map<?,?>> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+
+    public static class TupleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Tuple> {
+        public TupleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+        @Override
+        public String getFinal() { return Final.class.getName(); }
+
+        public static class Final extends JrubyAlgebraicEvalFunc.Final<Tuple> {
+            public Final() {}
+            public Final(String fileName, String functionName) { super(fileName,functionName); }
+        }
+    }
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.backend.executionengine.ExecException;
+
+import org.jruby.Ruby;
+import org.jruby.RubyArray;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class serves at the bridge between Ruby methods that
+ * are registered with and extend PigUdf, and their execution in
+ * Pig. An instance of the containing class is created, and their
+ * method name will be called against that instance. If they have
+ * a schema function associated, then when outputSchema is called,
+ * that function will be given the input Schema and the output will
+ * be given to Pig.
+ */
+public class JrubyEvalFunc extends EvalFunc<Object> {
+
+    private String methodName;
+    private long numRequiredArgs;
+    private long numOptionalArgs;
+
+    private String fileName;
+    private String functionName;
+
+    private boolean isInitialized = false;
+
+    private String schemaPiece;
+    private Object funcReceiver;
+    private Object funcInfoEncapsulator;
+
+    private static ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+    private static Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+    // Instantiating this class without arguments is meaningless
+    private JrubyEvalFunc() {}
+
+    /**
+     * The constructor for this class registers the filename of the Ruby script
+     * and the name of the method we care about. The difference between function name
+     * and method name is that the former is the name that was used to register the function,
+     * whereas the method name is the actual method that must be invoked. The two are often
+     * the same, but not always. functionName is the key to the list of EvalFuncs that
+     * are registered.
+     */
+    public JrubyEvalFunc(String filename, String functionName) throws IOException {
+        this.fileName = filename;
+        this.functionName = functionName;
+    }
+
+    /**
+     * This method initializes the objects necessary to evaluate the Ruby class on the pig side. Using
+     * the object that was saved to the functionName key by Ruby, this class gets an instance of the
+     * class that will receive method calls, as well as information on the arity.
+     */
+    private void initialize() {
+        funcInfoEncapsulator = JrubyScriptEngine.RubyFunctions.getFunctions("evalfunc", fileName).get(functionName);
+
+        funcReceiver = rubyEngine.callMethod(funcInfoEncapsulator, "get_receiver");
+        methodName = rubyEngine.callMethod(funcInfoEncapsulator, "method_name", String.class);
+        numRequiredArgs = rubyEngine.callMethod(funcInfoEncapsulator, "required_args", Long.class);
+        numOptionalArgs = rubyEngine.callMethod(funcInfoEncapsulator, "optional_args", Long.class); //TODO support varargs?
+
+        isInitialized = true;
+    }
+
+    /**
+     * The exec method passes the tuple argument to the Ruby function, and converts the result back to Pig.
+     */
+    @Override
+    public Object exec(Tuple tuple) throws IOException {
+        if (!isInitialized)
+            initialize();
+
+        try {
+            IRubyObject rubyResult = null;
+            if (tuple == null || (numRequiredArgs == 0 && numOptionalArgs == 0)) {
+                rubyResult = rubyEngine.callMethod(funcReceiver, methodName, IRubyObject.class);
+            } else {
+                Object[] args = PigJrubyLibrary.pigToRuby(ruby, tuple).toArray();
+                if (args.length >= numRequiredArgs && (numOptionalArgs == -1 || args.length <= numRequiredArgs + numOptionalArgs)) {
+                    rubyResult = rubyEngine.callMethod(funcReceiver, methodName, args, IRubyObject.class);
+                } else {
+                    String s = "Method " + methodName + " requires " + numRequiredArgs + " arguments and ";
+                    s += ( numOptionalArgs == -1 ? "unlimitated " : numOptionalArgs ) +" optional arguments. ";
+                    s += "Instead, " + args.length + " arguments given.";
+                    throw new RuntimeException(s);
+                }
+            }
+            return PigJrubyLibrary.rubyToPig(rubyResult);
+        } catch (Exception e) {
+            throw new IOException("Error executing function",  e);
+        }
+    }
+
+    /**
+     * This method uses the schema method of the function encapsulation object to get the Schema information for
+     * the Ruby method.
+     */
+    @Override
+    public Schema outputSchema(Schema input) {
+        if (!isInitialized)
+            initialize();
+        RubySchema rs = PigJrubyLibrary.pigToRuby(ruby, input);
+        return PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(funcInfoEncapsulator, "schema", new Object[]{rs, funcReceiver}, RubySchema.class));
+    }
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java Tue Apr  3 07:56:01 2012
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+
+import org.jruby.Ruby;
+import org.jruby.RubyArray;
+import org.jruby.RubyBoolean;
+import org.jruby.embed.LocalContextScope;
+import org.jruby.embed.LocalVariableBehavior;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.javasupport.JavaEmbedUtils.EvalUnit;
+import org.jruby.CompatVersion;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of the script engine for Jruby, which facilitates the registration
+ * of scripts as UDFs, and also provides information (via the nested class RubyFunctions)
+ * on the registered functions.
+ */
+public class JrubyScriptEngine extends ScriptEngine {
+    private static final Log LOG = LogFactory.getLog(JrubyScriptEngine.class);
+
+    //TODO test if it is necessary to have a per script (or even per method) runtime. PRO: avoid collisions CON: a bunch of runtimes, which could be slow
+    protected static final ScriptingContainer rubyEngine;
+
+    private boolean isInitialized = false;
+
+    static {
+        rubyEngine = new ScriptingContainer(LocalContextScope.SINGLETHREAD, LocalVariableBehavior.PERSISTENT);
+        rubyEngine.setCompatVersion(CompatVersion.RUBY1_9);
+    }
+
+    /**
+     * This is a static class which provides functionality around the functions that are registered with Pig.
+     */
+    static class RubyFunctions {
+        /**
+         * This cache maps function type to a map that maps path to a map of function name to the object
+         * which contains information about that function. In the case of an EvalFunc, there is a special
+         * function which encapsulates information about the function. In the case of an Accumulator or
+         * Algebraic, it is just an instance of the Class object that extends AccumulatorPigUdf or
+         * AlgebraicPigUdf, respectively.
+         */
+        private static Map<String, Map<String, Map<String, Object>>> functionsCache = Maps.newHashMap();
+
+        private static Map<String, Boolean> alreadyRunCache = Maps.newHashMap();
+
+        private static Map<String, String> cacheFunction = Maps.newHashMap();
+
+        static {
+            //TODO use an enum instead?
+            cacheFunction.put("evalfunc", "PigUdf.get_functions_to_register");
+            cacheFunction.put("accumulator", "AccumulatorPigUdf.classes_to_register");
+            cacheFunction.put("algebraic", "AlgebraicPigUdf.classes_to_register");
+
+            functionsCache.put("evalfunc", new HashMap<String,Map<String,Object>>());
+            functionsCache.put("accumulator", new HashMap<String,Map<String,Object>>());
+            functionsCache.put("algebraic", new HashMap<String,Map<String,Object>>());
+        }
+
+        @SuppressWarnings("unchecked")
+        private static Map<String,Object> getFromCache(String path, Map<String,Map<String,Object>> cacheToUpdate, String regCommand) {
+            Boolean runCheck = alreadyRunCache.get(path);
+            if (runCheck == null || !runCheck.booleanValue()) {
+                for (Map.Entry<String, Map<String, Map<String, Object>>> entry : functionsCache.entrySet())
+                    entry.getValue().remove(path);
+
+                rubyEngine.runScriptlet(getScriptAsStream(path), path);
+
+                alreadyRunCache.put(path, true);
+            }
+
+            Map<String,Object> funcMap = cacheToUpdate.get(path);
+
+            if (funcMap == null) {
+                funcMap = (Map<String,Object>)rubyEngine.runScriptlet(regCommand);
+                cacheToUpdate.put(path, funcMap);
+            }
+
+            return funcMap;
+        }
+
+        public static Map<String, Object> getFunctions(String cache, String path) {
+            return getFromCache(path, functionsCache.get(cache), cacheFunction.get(cache));
+        }
+    }
+
+    /**
+     * Evaluates the script containing ruby udfs to determine what udfs are defined as well as
+     * what libaries and other external resources are necessary. These libraries and resources
+     * are then packaged with the job jar itself.
+     */
+    @Override
+    public void registerFunctions(String path, String namespace, PigContext pigContext) throws IOException {
+        if (!isInitialized) {
+            pigContext.scriptJars.add(getJarPath(Ruby.class));
+            pigContext.addScriptFile("pigudf.rb", "pigudf.rb");
+            isInitialized = true;
+        }
+
+        for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("evalfunc", path).entrySet()) {
+            String method = entry.getKey();
+
+            String functionType = rubyEngine.callMethod(entry.getValue(), "name", String.class);
+
+            FuncSpec funcspec = new FuncSpec(JrubyEvalFunc.class.getCanonicalName() + "('" + path + "','" + method +"')");
+            pigContext.registerFunction(namespace + "." + method, funcspec);
+        }
+
+        for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("accumulator", path).entrySet()) {
+            String method = entry.getKey();
+
+            if (rubyEngine.callMethod(entry.getValue(), "check_if_necessary_methods_present", RubyBoolean.class).isFalse())
+                throw new RuntimeException("Method " + method + " does not have all of the required methods present!");
+
+            pigContext.registerFunction(namespace + "." + method, new FuncSpec(JrubyAccumulatorEvalFunc.class.getCanonicalName() + "('" + path + "','" + method +"')"));
+        }
+
+        for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("algebraic", path).entrySet()) {
+            String method = entry.getKey();
+
+            if (rubyEngine.callMethod(entry.getValue(), "check_if_necessary_methods_present", RubyBoolean.class).isFalse())
+                throw new RuntimeException("Method " + method + " does not have all of the required methods present!");
+
+            Schema schema = PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(entry.getValue(), "get_output_schema", RubySchema.class));
+            String canonicalName = JrubyAlgebraicEvalFunc.class.getCanonicalName() + "$";
+
+            // In the case of an Algebraic UDF, a type specific EvalFunc is necessary (ie not EvalFunc<Object>), so we
+            // inspect the type and instantiated the proper class.
+            switch (schema.getField(0).type) {
+                case DataType.BAG: canonicalName += "Bag"; break;
+                case DataType.TUPLE: canonicalName += "Tuple"; break;
+                case DataType.CHARARRAY: canonicalName += "Chararray"; break;
+                case DataType.DOUBLE: canonicalName += "Double"; break;
+                case DataType.FLOAT: canonicalName += "Float"; break;
+                case DataType.INTEGER: canonicalName += "Integer"; break;
+                case DataType.LONG: canonicalName += "Long"; break;
+                case DataType.MAP: canonicalName += "Map"; break;
+                case DataType.BYTEARRAY: canonicalName += "DataByteArray"; break;
+                default: throw new ExecException("Unable to instantiate Algebraic EvalFunc " + method + " as schema type is invalid");
+            }
+
+            canonicalName += "JrubyAlgebraicEvalFunc";
+
+            pigContext.registerFunction(namespace + "." + method, new FuncSpec(canonicalName + "('" + path + "','" + method +"')"));
+        }
+
+        // Determine what external dependencies to ship with the job jar
+        HashSet<String> toShip = libsToShip();
+        for (String lib : toShip) {
+            File libFile = new File(lib);
+            if (lib.endsWith(".rb")) {
+                pigContext.addScriptFile(lib);
+            } else if (libFile.isDirectory()) {
+                //
+                // Need to package entire contents of directory
+                //
+                List<File> files = listRecursively(libFile);
+                for (File file : files) {
+                    if (file.isDirectory()) {
+                        continue;
+                    } else if (file.getName().endsWith(".jar") || file.getName().endsWith(".zip")) {
+                        pigContext.scriptJars.add(file.getPath());
+                    } else {
+                        String localPath = libFile.getName() + file.getPath().replaceFirst(libFile.getPath(), "");
+                        pigContext.addScriptFile(localPath, file.getPath());
+                    }
+                }
+            } else {
+                pigContext.scriptJars.add(lib);
+            }
+        }
+    }
+    /**
+     * Consults the scripting container, after the script has been evaluated, to
+     * determine what dependencies to ship.
+     * <p>
+     * FIXME: Corner cases like the following: "def foobar; require 'json'; end"
+     * are NOT dealt with using this method
+     */
+    private HashSet<String> libsToShip() {
+        RubyArray loadedLibs = (RubyArray)rubyEngine.get("$\"");
+        RubyArray loadPaths = (RubyArray)rubyEngine.get("$LOAD_PATH");
+        // Current directory first
+        loadPaths.add(0, "");
+
+        HashSet<String> toShip = new HashSet<String>();
+        HashSet<Object> shippedLib = new HashSet<Object>();
+
+        for (Object loadPath : loadPaths) {
+            for (Object lib : loadedLibs) {
+                if (lib.toString().equals("pigudf.rb"))
+                    continue;
+                if (shippedLib.contains(lib))
+                    continue;
+                String possiblePath = (loadPath.toString().isEmpty()?"":loadPath.toString() +
+                        File.separator) + lib.toString();
+                if ((new File(possiblePath)).exists()) {
+                    // remove prefix ./
+                    toShip.add(possiblePath.startsWith("./")?possiblePath.substring(2):
+                        possiblePath);
+                    shippedLib.add(lib);
+                }
+            }
+        }
+        return toShip;
+    }
+
+    private static List<File> listRecursively(File directory) {
+        File[] entries = directory.listFiles();
+
+        ArrayList<File> files = new ArrayList<File>();
+
+        // Go over entries
+        for (File entry : entries) {
+            files.add(entry);
+            if (entry.isDirectory()) {
+                files.addAll(listRecursively(entry));
+            }
+        }
+        return files;
+    }
+
+    @Override
+    protected Map<String, List<PigStats>> main(PigContext pigContext, String scriptFile) throws IOException {
+        throw new UnsupportedOperationException("Unimplemented");
+    }
+
+    @Override
+    protected String getScriptingLang() {
+        return "jruby";
+    }
+
+    @Override
+    protected Map<String, Object> getParamsFromVariables() throws IOException {
+        Map<String, Object> vars = Maps.newHashMap();
+        return vars;
+    }
+}