You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/03 09:56:02 UTC
svn commit: r1308736 [1/3] - in /pig/branches/branch-0.10: ./ bin/ ivy/
src/main/ src/main/jruby/ src/org/apache/pig/ src/org/apache/pig/impl/util/
src/org/apache/pig/scripting/ src/org/apache/pig/scripting/jruby/
src/org/apache/pig/tools/counters/ tes...
Author: daijy
Date: Tue Apr 3 07:56:01 2012
New Revision: 1308736
URL: http://svn.apache.org/viewvc?rev=1308736&view=rev
Log:
PIG-2317: Ruby/Jruby UDFs
Added:
pig/branches/branch-0.10/src/main/
pig/branches/branch-0.10/src/main/jruby/
pig/branches/branch-0.10/src/main/jruby/pigudf.rb
pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java
pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/PigJrubyLibrary.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubyDataBag.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubyDataByteArray.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/RubySchema.java
pig/branches/branch-0.10/src/org/apache/pig/tools/counters/
pig/branches/branch-0.10/src/org/apache/pig/tools/counters/PigCounterHelper.java
pig/branches/branch-0.10/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/AppendIndex.java
pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/
pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/morerubyudfs.rb
pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/scriptingudfs.rb
pig/branches/branch-0.10/test/e2e/pig/udfs/ruby/udf2.rb
pig/branches/branch-0.10/test/org/apache/pig/test/utils/HelperEvalFuncUtils.java
Modified:
pig/branches/branch-0.10/CHANGES.txt
pig/branches/branch-0.10/bin/pig
pig/branches/branch-0.10/build.xml
pig/branches/branch-0.10/ivy.xml
pig/branches/branch-0.10/ivy/libraries.properties
pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java
pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java
pig/branches/branch-0.10/test/e2e/pig/build.xml
pig/branches/branch-0.10/test/e2e/pig/conf/default.conf
pig/branches/branch-0.10/test/e2e/pig/conf/local.conf
pig/branches/branch-0.10/test/e2e/pig/drivers/TestDriverPig.pm
pig/branches/branch-0.10/test/e2e/pig/drivers/Util.pm
pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf
pig/branches/branch-0.10/test/org/apache/pig/test/TestUDF.java
Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Tue Apr 3 07:56:01 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2317: Ruby/Jruby UDFs (jcoveney via daijy)
+
PIG-2619: HBaseStorage constructs a Scan with cacheBlocks = false (andy lindeman via jcoveney)
PIG-1270: Push limit into loader (daijy)
Modified: pig/branches/branch-0.10/bin/pig
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/bin/pig?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/bin/pig (original)
+++ pig/branches/branch-0.10/bin/pig Tue Apr 3 07:56:01 2012
@@ -135,6 +135,15 @@ if [ -z "$JYTHON_JAR" ]; then
fi
fi
+JRUBY_JAR=`echo ${PIG_HOME}/lib/jruby-complete-*.jar`
+
+if [ -z "$JRUBY_JAR" ]; then
+ JRUBY_JAR=`echo $PIG_HOME/build/ivy/lib/Pig/jruby-complete-*.jar`
+ if [ -n "$JRUBY_JAR" ]; then
+ CLASSPATH=${CLASSPATH}:$JRUBY_JAR
+ fi
+fi
+
for f in $PIG_HOME/share/pig/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
Modified: pig/branches/branch-0.10/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/build.xml?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/build.xml (original)
+++ pig/branches/branch-0.10/build.xml Tue Apr 3 07:56:01 2012
@@ -589,6 +589,7 @@
</section>
</manifest>
<fileset file="${basedir}/conf/pig-default.properties" />
+ <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
</jar>
<!-- @depricated -->
<jar jarfile="${output.jarfile}" basedir="${build.classes}">
@@ -623,6 +624,7 @@
<include name="zookeeper*.jar"/>
</zipgroupfileset>
<fileset file="${basedir}/conf/pig-default.properties" />
+ <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
</jar>
<antcall target="include-meta" inheritRefs="true" inheritall="true"/>
<copy file="${output.jarfile}" tofile="${output.jarfile.backcompat}"/>
@@ -680,6 +682,7 @@
</section>
</manifest>
<fileset file="${basedir}/conf/pig-default.properties" />
+ <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
</jar>
<!-- @depricated -->
<jar jarfile="${output.jarfile.withouthadoop}" basedir="${build.classes}">
@@ -703,6 +706,7 @@
<include name="guava-${guava.version}.jar" />
</zipgroupfileset>
<fileset file="${basedir}/conf/pig-default.properties" />
+ <fileset file="${basedir}/src/main/jruby/pigudf.rb" />
</jar>
<copy file="${output.jarfile.withouthadoop}" tofile="${output.jarfile.backcompat.withouthadoop}"/>
</target>
Modified: pig/branches/branch-0.10/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/ivy.xml?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/ivy.xml (original)
+++ pig/branches/branch-0.10/ivy.xml Tue Apr 3 07:56:01 2012
@@ -195,6 +195,8 @@
<dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="compile->master"/>
<dependency org="org.jboss.netty" name="netty" rev="3.2.2.Final" conf="compile->master"/>
+ <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
+
<dependency org="org.apache.hbase" name="hbase" rev="${hbase.version}" conf="compile->master">
<artifact name="hbase" type="jar"/>
<artifact name="hbase" type="test-jar" ext="jar" m:classifier="tests"/>
Modified: pig/branches/branch-0.10/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/ivy/libraries.properties?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/ivy/libraries.properties (original)
+++ pig/branches/branch-0.10/ivy/libraries.properties Tue Apr 3 07:56:01 2012
@@ -52,6 +52,7 @@ joda-time.version=1.6
jsch.version=0.1.38
json-simple.version=1.1
junit.version=4.5
+jruby.version=1.6.7
jython.version=2.5.0
rhino.version=1.7R2
antlr.version=3.4
Added: pig/branches/branch-0.10/src/main/jruby/pigudf.rb
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/main/jruby/pigudf.rb?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/main/jruby/pigudf.rb (added)
+++ pig/branches/branch-0.10/src/main/jruby/pigudf.rb Tue Apr 3 07:56:01 2012
@@ -0,0 +1,421 @@
+throw "pigudf.rb only works under JRuby!" unless RUBY_PLATFORM=="java"
+require 'jruby'
+org.apache.pig.scripting.jruby.PigJrubyLibrary.new.load(JRuby.runtime, false)
+
+#TODO output_schema should accept a Schema object as well, and use Schema objects
+#TODO AccumulatorEvalFunc output_schema should not allow you to give a block and defined it
+
+# This is the base class for runy of the mill EvalFuncs. A class just serves to
+# contain similar jobs, as well as allow for method reuse. In the case of simple
+# EvalFuncs, each method will be turned into a UDF (though they do not have to be called).
+#
+# TODO: EXPLAIN SYNTAX
+
+class PigUdf
+
+ # Here we initialize the variables we'll be using at the class level (generally
+ # analogous to static in Java). The nice thing about this method is that these
+ # values are all set in the same way, even from children. Thus, all of the children
+ # will update PigUdf.@@functions_to_register, and on the Java side when we want to
+ # access this, we can return that Map. This means it is no longer necessary to keep
+ # track of descendent children, etc, since all that matters are the methods that
+ # are registered with subclasses.
+ #
+ # The @@class_object_to_name_and_add variable is used by self.evalfunc and self.filterfunc.
+ # See the documentation for the former to understand why it is necessary. @@schema holds
+ # the last schema given by output_schema or output_schema_function, see the documentation
+ # on output_schema for more.
+
+ @@functions_to_register = {}
+ @@class_object_to_name_and_add = nil
+ @@schema = nil
+
+ # See the documentation on self.evalfunc for why this is necessary. This takes the current class
+ # object and registers it. This is necessary because self.evalfunc has to return before .to_s
+ # will return something meaningful and not gibberish.
+
+ def self.name_and_add_class_object
+ if @@class_object_to_name_and_add
+ name = @@class_object_to_name_and_add.class_object.to_s
+ @@class_object_to_name_and_add.method_name = "eval"
+ @@functions_to_register[name] = @@class_object_to_name_and_add
+ end
+ @@class_object_to_name_and_add = nil
+ end
+
+ # This is the core function that registers a method as a UDF. The pig_func_name
+ # identifies it, and in most cases, is the method name (the exception begin
+ # UDFs created using self.evalfunc). The class_object is the class against an instance
+ # of which the method will be called. The arity is so Pig knows how many arguments
+ # to pass to the UDF, and the output_schema defines the Schema of the output, either
+ # as a string, or as a function.
+
+ def self.register_function pig_func_name, class_object, arity, output_schema
+ self.name_and_add_class_object
+
+ pig_func_name = pig_func_name.to_s
+
+ reg = EvalFunc.new class_object, pig_func_name, arity, output_schema
+
+ @@functions_to_register[pig_func_name] = reg
+ end
+
+ def self.set_class_object_to_name_and_add func
+ self.name_and_add_class_object
+ @@class_object_to_name_and_add = func
+ end
+
+ # This method provides the most succinct way to define a UDF. The syntax is as follows:
+ #
+ # UdfName = PigUdf.evalfunc('int') do |arg1|
+ # return arg.length
+ # end
+ #
+ # EvalFunc takes one parameter, the schema to be returned, and a block which will represent
+ # the method call.
+ #
+ # In the case that this will be used, then it will be one class with one function,
+ # and the function name will be UdfName. It is essential that UdfName begin with
+ # a capital letter, as this method uses a hook given to ruby where Name = Class.new
+ # will generate a class of name Name, but only if Name begins with a capital letter.
+ #
+ # The reason for naming the function "GETCLASSFROMOBJECT" is that the class object must first
+ # be returned for its name to be available. Asking it for its name before allowing "evalfunc"
+ # to return will not yield the name it is given. Thus, we plant "GETCLASSFROMOBJECT" so the next
+ # time we access @functions_to_register, we know to check.
+
+ def self.evalfunc output_schema, &blk
+ c=Class.new do
+ define_method :eval do |*args|
+ blk.call(*args)
+ end
+ end
+ self.set_class_object_to_name_and_add EvalFunc.new c, "GETFROMCLASSOBJECT", blk.arity, output_schema
+ c
+ end
+
+ # This method functions identically to evalfunc above, the only difference being that no schema
+ # needs to be given.
+
+ def self.filterfunc &blk
+ c=Class.new do
+ define_method :eval do |*args|
+ blk.call(*args)
+ end
+ end
+ self.set_class_object_to_name_and_add EvalFunc.new c, "GETFROMCLASSOBJECT", blk.arity, Schema.boolean
+ c
+ end
+
+ # This is the function which register the schema associated with a given function. There are
+ # two ways that it can be invoked, with one argument or two (thus the vague argument names).
+ #
+ # case 1: one argument
+ # In this case, output_schema's argument is the schema to be set for the next method declaration.
+ # For example:
+ # output_schema "long"
+ #
+ # The above would mean that the schema for the function following it would be set to long. The mechanism
+ # by which this is achieved is by setting a class schema variable to the schema, and the next time
+ # a method is declared in the class, the class uses the schema that was set to register the function being
+ # declared. For more information on that, see self.method_added, as this is the Ruby provided hook
+ # that is used to allow this disconnect between declaring a schema and the method declaration that follows.
+ #
+ # case 2: two arguments
+ # In this case, arg1 is the name of the function whose schema we want to set, and arg2 is
+ # the schema, ie
+ #
+ # output_schema :sum, "long"
+ #
+ # You can only use this after the function is declared, otherwise there will be an error.
+ # In this case, the information passed to the registration function is the function name,
+ # an instance of the class (so that on the Java side we can instantiate a version), the arity,
+ # and the schema. For more information on how that information is used, see self.register_function.
+ #
+ # The following two uses are identical:
+ #
+ # use 1:
+ # output_schema "long"
+ # def sum x, y
+ # return x + y
+ # end
+ #
+ # use 2:
+ # def sum x,y
+ # return x + y
+ # end
+ # output_schema :sum, "long"
+
+ def self.output_schema arg1, arg2=nil
+ if arg2
+ function_name = arg1.to_s
+ schema = arg2.to_s
+ self.register_function function_name, self, function_name, schema
+ else
+ @@schema = arg1
+ end
+ end
+
+ # This function acts identically to output_schema, except that it is not necessary to provide a schema string
+ # because a filter func will always have a set schema (it will return boolean).
+
+ def self.filter_func arg1=nil
+ schema = "FILTERFUNC"
+ if arg1
+ function_name = arg1.to_s
+ self.output_schema function_name, schema
+ else
+ self.output_schema schema
+ end
+ end
+
+ # output_schema is only useful when the function at hand has a deterministic schema. In the case that the schema
+ # needs to be dynamic, it is useful to be able to process the input schema with a function and return the appropriate
+ # output schema. An example of this might be a concat function, which takes two values and concatenates them together.
+ # This function could work for chararrays, but also for bytearrays. In that case, the output schema depends on the input schema.
+ #
+ # As with output_schema, there are two cases, and they are identical (see output_schema for a more detailed explanation).
+ # The difference, however, is that instead of passing a string ie "long", the user gives a function name. Note: the schema
+ # function does not yet have to be defined. In the case of two arguments, the same information is passed to register_function
+ # as in the case of output_schema, the difference being that while the schema is passed as a string, it has an identifier
+ # appended to it so that when this function is running in Java, we'll know that we should be using a function.
+
+ def self.output_schema_function arg1, arg2=nil #TODO allow it to also accept a block, as in ComplexPigUdf
+ schema_func = (arg2||arg1).to_sym
+ if arg2
+ function_name = arg1.to_s
+ self.register_function function_name, self, function_name, schema_func.to_sym
+ else
+ @@schema = arg1.to_sym
+ end
+ end
+
+ # Javaists love their camelCase
+ class << self
+ alias :outputSchema :output_schema
+ alias :filterFunc :filter_func
+ alias :outputSchemaFunction :output_schema_function
+ end
+
+ # This is a hook that Ruby provides that is called whenever a method is declared on the subclass.
+ # This is used so that we have visibility on the methods as they are declared, which is useful because
+ # every declared method will be registered as a UDF for use in Pig. In the case of a method that doesn't
+ # yet have a schema declared, it's return type will just be a bytearray, as in Pig.
+
+ def self.method_added function_name
+ if @@schema
+ self.register_function function_name, self, function_name, @@schema
+ elsif !@@functions_to_register[function_name]
+ self.register_function function_name, self, function_name, nil
+ end
+ @@schema = nil
+ end
+
+ # This returns the map that maintains the Function classes that have information on declared methods.
+
+ def self.get_functions_to_register
+ self.name_and_add_class_object
+
+ @@functions_to_register
+ end
+
+ # The Function class privates a convenient wrapper to store information about EvalFuncs, separating
+ # out the methods that will be used on the frontend to get information on the method registered.
+
+ class Function
+ attr_accessor :method_name
+ attr_reader :arity, :class_object
+
+ def initialize class_object, method_name, arity
+ @class_object = class_object
+ @method_name = method_name
+ @arity = arity
+ end
+
+ def required_args
+ if @arity.is_a? Numeric
+ @arity
+ else
+ @class_object.instance_method(@arity.to_sym).parameters.count {|x,y| x==:req}
+ end
+ end
+
+ def optional_args
+ if @arity.is_a? Numeric
+ 0
+ else
+ params = @class_object.instance_method(@arity.to_sym).parameters
+ return -1 if params.any? {|x,y| x==:rest}
+ params.count {|x,y| x==:opt}
+ end
+ end
+
+ # This conveniently gives an instance of the class this Function wraps, so that on the Java end
+ # it is trivial to get the object against which method calls can be made.
+
+ def get_receiver
+ @class_object.new
+ end
+
+ # This is useful for identifying the subclass Java is dealing with (EvalFunc, FilterFunc, etc)
+
+ def name
+ return self.class.to_s
+ end
+ end
+
+ class EvalFunc < Function
+ def initialize class_object, method_name, arity, schema_or_func
+ super class_object, method_name, arity
+ @schema_or_func = schema_or_func
+ end
+
+ # This is the function that will be used from Java to get the proper schema of the output.
+ # Given that users have two options, output_schema or output_schema_function, this method
+ # detects which and acts appropriately. It must be given an instance of the EvalFunc (generally
+ # the result of "get_receiver") in the case of an output_schema_function so that it can evaluate
+ # the output Schema based on the input Schema.
+
+ def schema input_schema, class_instance
+ if !@schema_or_func
+ return Schema.bytearray
+ elsif @schema_or_func.is_a? String
+ return Schema.new @schema_or_func
+ elsif @schema_or_func.is_a? Schema
+ return @schema_or_func
+ else
+ func = @schema_or_func
+ func = @class_object.instance_method(func) if func.is_a? Symbol
+ return func.bind(class_instance).call input_schema
+ end
+ end
+ end
+end
+
+# This is the base class used for Algebraic and Accumulator functions. The reason for the different
+# implementation is because there is more structure in these cases. In the case of general EvalFuncs,
+# a method is equivalent to a UDF. In the case of Algebraic and Accumulator UDFs, however, a class is
+# equivalent to a UDF. Thus, instead of keeping track of methods added, we keep track of classes
+# that extend our Algebraic and Accumulator UDF base classes.
+
+class ComplexUdfBase
+ # As with the basic PigUdf, there is a class method "output_schema" which defines the schema for the class.
+ # This method can be called anywhere (as there is not the issue of multiple UDFs to worry about). If it is not
+ # called, it will have return type bytearray.
+
+ def self.output_schema schema
+ @schema = schema
+ end
+
+ class << self
+ alias :outputSchema :output_schema
+ end
+
+ # This returns the schema, or in the case that one was not supplied, a Schema of bytearray.
+
+ def self.get_output_schema
+ Schema.new(@schema||Schema.bytearray)
+ end
+
+ # Since a class = a UDF, in this case it makes sense to traverse the tree of decendant classes
+ # in order to pull all of the registered classes. It's important to note
+
+ def self.classes_to_register
+ classes = {}
+ ObjectSpace.each_object(Class) do |c|
+ classes[c.to_s] = c if c.ancestors.include?(self) and (c != self)
+ end
+ classes
+ end
+
+ # This is a method that can be used by Pig to ensure that all of the necessary methods are present, so that
+ # the function will throw an error on parsing instead of on execution. This is a shell implementation
+ # to ensure that necessary_methods is called by a subclass, which will then generate the proper implementation.
+
+ def self.check_if_necessary_methods_present
+ throw "Need to declare the methods that should be present"
+ end
+
+ # This is a method that, if called at the class level, defines a set of methods that must be called
+ # by any child classes (ie UDFs).
+
+ def self.necessary_methods *m
+ self.instance_eval "def self.check_if_necessary_methods_present; #{Array(m).inspect}.all? { |m| self.method_defined? m }; end"
+ end
+end
+
+# This is the class that any Accumulator UDF must extend. The necessary_methods call ensures that all
+# child classes have the necessary methods implemented. AccumulatorPigUdfs support dynamic output_schema.
+# To do so, register a block with the schema function, as so:
+# output_schema do |input|
+# return input
+# end
+#
+# In the case of a non-dyanamic output schema, it's possible to stil just set output_schema "long".
+#
+# an example of an accumulator UDF is:
+#
+# class SUM < AccumulatorPigUdf
+# output_schema "long"
+#
+# def exec input
+# @res ||= 0
+# input.flatten.inject(:+)
+# end
+# def get
+# @res
+# end
+# end
+
+class AccumulatorPigUdf < ComplexUdfBase
+ def self.output_schema schema=nil, &blk
+ if block_given?
+ throw "Can specify block or schema but not both!" if schema
+ throw "Block must accept one argument!" if blk.arity != 1
+ @schema = blk
+ else
+ @schema = schema
+ end
+ end
+
+ class << self
+ alias :outputSchema :output_schema
+ end
+
+ def self.get_output_schema input_schema=nil
+ if input_schema && @schema.class == Proc
+ @schema.call input_schema
+ else
+ Schema.new(@schema||Schema.bytearray)
+ end
+ end
+
+ necessary_methods :exec, :get
+end
+
+# This is the class that any Accumulator UDF must extend. The necessary_methods call ensures that all
+# child classes have the necessary methods implemented.
+#
+# an example of an Algebraic UDF is:
+#
+# class Count < AlgebraicPigUdf
+# output_schema "long"
+#
+# def initial t
+# t.nil? ? 0 : 1
+# end
+#
+# def intermed t
+# return 0 if t.nil?
+# return t.flatten.inject(:+)
+# end
+#
+# def final t
+# return intermed(t)
+# end
+# end
+
+
+class AlgebraicPigUdf < ComplexUdfBase
+ necessary_methods :initial, :intermed, :final
+end
Added: pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/AccumulatorEvalFunc.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This class is used to provide a free implementation of the EvalFunc exec function given
+ * implementation of the Accumulator interface. Instead of having to provide a redundant
+ * implementation, this provides the base exec function for free, given that the methods
+ * associated with the Accumulator interface are implemented. For information on how to
+ * implement Accumulator, see {@link Accumulator}.
+ */
+public abstract class AccumulatorEvalFunc<T> extends EvalFunc<T> implements Accumulator<T> {
+
+ @Override
+ public abstract void accumulate(Tuple b) throws IOException;
+
+ @Override
+ public abstract void cleanup();
+
+ @Override
+ public abstract T getValue();
+
+ @Override
+ public T exec(Tuple input) throws IOException {
+ accumulate(input);
+ T result = getValue();
+ cleanup();
+ return result;
+ }
+
+}
Added: pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/AlgebraicEvalFunc.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.counters.PigCounterHelper;
+
+/**
+ * This class is used to provide a free implementation of the Accumulator interface
+ * and EvalFunc class in the case of an Algebraic function. Instead of having to provide
+ * redundant implementations for Accumulator and EvalFunc, implementing the
+ * getInitial, getIntermed, and getFinal methods (which implies implementing the static classes
+ * they reference) will give you an implementation of each of those for free. <br><br>
+ * One key thing to note is that if a subclass of AlgebraicEvalFunc wishes to use any constructor
+ * arguments, it MUST call super(args).
+ * <br><br>
+ * IMPORTANT: the implementation of the Accumulator interface that this class provides is good,
+ * but it is simulated. For maximum efficiency, it is important to manually implement the accumulator
+ * interface. See {@link Accumulator} for more information on how to do so.
+ */
+public abstract class AlgebraicEvalFunc<T> extends AccumulatorEvalFunc<T> implements Algebraic {
+ private EvalFunc<Tuple> initEvalFunc;
+ private EvalFunc<Tuple> intermedEvalFunc;
+ private EvalFunc<T> finalEvalFunc;
+
+ private static final BagFactory mBagFactory = BagFactory.getInstance();
+ private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private DataBag intermediateDB;
+ private DataBag wrapDB;
+ private DataBag accumDB;
+
+ private Tuple argTuple;
+ private Tuple accumTup;
+
+ /**
+ * This represents the number of elements an intermediate bag must have
+ * in order for the intermediate EvalFunc to be used on it.
+ */
+ private int bagCombineThreshold = 1000;
+
+ /**
+ * This represents the factor by which the result of applying the intermediate
+ * EvalFunc must shrink the data in order for combining at this stage to be
+ * deemed "worth it."
+ */
+ private int combineFactor = 2;
+
+ private PigCounterHelper pigCounterHelper = new PigCounterHelper();
+
+ private boolean combine;
+
+ private String[] constructorArgs;
+
+ /**
+ * It is key that if a subclass has a constructor, that it calls super(args...) or else
+ * this class will not instantiate properly.
+ */
+ public AlgebraicEvalFunc(String... constructorArgs) { this.constructorArgs = constructorArgs; }
+
+ /**
+ * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+ * more information.
+ */
+ @Override
+ public abstract String getFinal();
+
+ /**
+ * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+ * more information.
+ */
+ @Override
+ public abstract String getInitial();
+
+ /**
+ * This must be implement as per a normal Algebraic interface. See {@link Algebraic} for
+ * more information.
+ */
+ @Override
+ public abstract String getIntermed();
+
+ private boolean init = false;
+
+ /**
+ * This helper function instantiates an EvalFunc given its String class name.
+ */
+ private EvalFunc<?> makeEvalFunc(String base) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(base).append("(");
+
+ boolean first = true;
+ for (String s : constructorArgs) {
+ if (!first) sb.append(",");
+ else first = false;
+ sb.append("'").append(s).append("'");
+ }
+
+ sb.append(")");
+
+ return (EvalFunc<?>)PigContext.instantiateFuncFromSpec(sb.toString());
+ }
+
+ /**
+ * This is the free accumulate implementation based on the static classes provided
+ * by the Algebraic static classes. This implemention works by leveraging the
+ * initial, intermediate, and final classes provided by the algebraic interface.
+ * The exec function of the Initial EvalFunc will be called on every Tuple of the input
+ * and the output will be collected in an intermediate state. Periodically, this intermediate
+ * state will have the Intermediate EvalFunc called on it 1 or more times. The Final EvalFunc
+ * is not called until getValue() is called.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void accumulate(Tuple input) throws IOException {
+ if (!init) {
+ intermediateDB = mBagFactory.newDefaultBag();
+ wrapDB = mBagFactory.newDefaultBag();
+ accumDB = mBagFactory.newDefaultBag();
+
+ argTuple = mTupleFactory.newTuple(1);
+ argTuple.set(0, wrapDB);
+
+ accumTup = mTupleFactory.newTuple(1);
+ accumTup.set(0,accumDB);
+
+ initEvalFunc = (EvalFunc<Tuple>)makeEvalFunc(getInitial());
+ intermedEvalFunc = (EvalFunc<Tuple>)makeEvalFunc(getIntermed());
+ finalEvalFunc = (EvalFunc<T>)makeEvalFunc(getFinal());
+
+ combine = true;
+ init = true;
+ }
+
+ accumDB.clear();
+
+ for (Tuple t : (DataBag)input.get(0)) {
+ wrapDB.clear();
+ wrapDB.add(t);
+ accumDB.add(initEvalFunc.exec(argTuple));
+ }
+
+ intermediateDB.add(intermedEvalFunc.exec(accumTup));
+ if (combine && intermediateDB.size() > bagCombineThreshold) {
+ long initialSizeEstimate = intermediateDB.getMemorySize();
+ DataBag newIntermediateDB = mBagFactory.newDefaultBag();
+ Tuple t = mTupleFactory.newTuple(1);
+ t.set(0, intermediateDB);
+ newIntermediateDB.add(intermedEvalFunc.exec(t));
+ intermediateDB = newIntermediateDB;
+ long newSizeEstimate = intermediateDB.getMemorySize();
+ pigCounterHelper.incrCounter("AlgebraicEvalFunc", "InitialSizeEst", initialSizeEstimate);
+ pigCounterHelper.incrCounter("AlgebraicEvalFunc", "PostCombineSizeEst", newSizeEstimate);
+ pigCounterHelper.incrCounter("AlgebraicEvalFunc", "CombineApply", 1L);
+ if (combineFactor * newSizeEstimate > initialSizeEstimate) {
+ combine = false;
+ pigCounterHelper.incrCounter("AlgebraicEvalFunc", "CombineShutoff", 1L);
+ }
+ }
+ }
+
+ /**
+ * Per the Accumulator interface, this clears all of the variables used in the implementation.
+ */
+ @Override
+ public void cleanup() {
+ intermediateDB = null;
+ wrapDB = null;
+ accumDB = null;
+
+ argTuple = null;
+ accumTup = null;
+
+ initEvalFunc = null;
+ intermedEvalFunc = null;
+ finalEvalFunc = null;
+
+ init = false;
+ }
+
+ /**
+ * This function returns the ultimate result. It is when getValue() is called that
+ * the Final EvalFunc's exec function is called on the accumulated data.
+ */
+ @Override
+ public T getValue() {
+ try {
+ return finalEvalFunc.exec(mTupleFactory.newTuple(intermediateDB));
+ } catch (IOException e) {
+ throw new RuntimeException("Error in AlgebraicEvalFunc evaluating final method");
+ }
+ }
+
+}
Modified: pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/impl/util/JarManager.java Tue Apr 3 07:56:01 2012
@@ -126,10 +126,28 @@ public class JarManager {
mergeJar(jarFile, extraJar, null, contents);
}
for (String path: pigContext.scriptFiles) {
- addStream(jarFile, path, new FileInputStream(new File(path)),contents);
+ InputStream stream = null;
+ if (new File(path).exists()) {
+ stream = new FileInputStream(new File(path));
+ } else {
+ stream = PigContext.getClassLoader().getResourceAsStream(path);
+ }
+ if (stream==null) {
+ throw new IOException("Cannot find " + path);
+ }
+ addStream(jarFile, path, stream, contents);
}
for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
- addStream(jarFile, entry.getKey(), new FileInputStream(entry.getValue()),contents);
+ InputStream stream = null;
+ if (entry.getValue().exists()) {
+ stream = new FileInputStream(entry.getValue());
+ } else {
+ stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+ }
+ if (stream==null) {
+ throw new IOException("Cannot find " + entry.getValue().getPath());
+ }
+ addStream(jarFile, entry.getKey(), stream, contents);
}
jarFile.putNextEntry(new ZipEntry("pigContext"));
Modified: pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java?rev=1308736&r1=1308735&r2=1308736&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/ScriptEngine.java Tue Apr 3 07:56:01 2012
@@ -47,8 +47,7 @@ public abstract class ScriptEngine {
public static enum SupportedScriptLang {
// possibly jruby in the future
- //jruby(new String[]{}, new String[]{}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
-
+ jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine");
@@ -122,7 +121,8 @@ public abstract class ScriptEngine {
* @return a stream (it is the responsibility of the caller to close it)
* @throws IllegalStateException if we could not open a stream
*/
- protected static InputStream getScriptAsStream(String scriptPath) {
+ public static InputStream getScriptAsStream(String scriptPath) {
+ //protected static InputStream getScriptAsStream(String scriptPath) {
InputStream is = null;
File file = new File(scriptPath);
if (file.exists()) {
Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAccumulatorEvalFunc.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.scripting.jruby.JrubyScriptEngine.RubyFunctions;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.jruby.Ruby;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class provides a bridge between Ruby classes that extend AccumulatorPigUdf
+ * and their execution in Pig. This class passes a Bag of data to the Ruby "exec"
+ * function, and ultimate gets the value by calling "get" on the class instance
+ * that receives methods.
+ */
+public class JrubyAccumulatorEvalFunc extends AccumulatorEvalFunc<Object> {
+ private Object methodReceiver;
+ private Object classObject;
+ private boolean isInitialized = false;
+ private String path;
+ private String methodName;
+
+ private static final ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+ private static final Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+ // It is meaningless to instantiate this class without a path and a method
+ private JrubyAccumulatorEvalFunc() {}
+
+ /**
+ * This constructor is used by JrubyScriptEngine to register a Ruby class as an Accumulator.
+ * The path and methodName are used to find the ruby Class, which is then instantated and used.
+ */
+ public JrubyAccumulatorEvalFunc(String path, String methodName) {
+ this.path = path;
+ this.methodName = methodName;
+ }
+
+ /**
+ * This function intializes the object that receives method calls, and the class object that
+ * has schema information. While this is only 3 lines, it is split out so that the schema
+ * function can initialize it if necessary. The class object is pulled from the ruby script
+ * registered at the path per the RubyFunctions helper methods.
+ */
+ private void initialize() {
+ classObject = RubyFunctions.getFunctions("accumulator", path).get(methodName);
+ methodReceiver = rubyEngine.callMethod(classObject, "new");
+ isInitialized = true;
+ }
+
+ /**
+ * This uses the "exec" method required of AccumulatorPigUdf Ruby classes. It streams the data bags
+ * it receives through the exec method defined on the registered class.
+ */
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ if (!isInitialized)
+ initialize();
+ RubyDataBag db = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)b.get(0));
+ rubyEngine.callMethod(methodReceiver, "exec", db, IRubyObject.class);
+ }
+
+ @Override
+ public void cleanup() {
+ isInitialized = false;
+ methodReceiver = null;
+ }
+
+ /**
+ * This method calls "get" on the AccumulatorPigUdf Ruby class that was specified.
+ */
+ @Override
+ public Object getValue() {
+ IRubyObject rubyResult = rubyEngine.callMethod(methodReceiver, "get", IRubyObject.class);
+ try {
+ return PigJrubyLibrary.rubyToPig(rubyResult);
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to convert result from Ruby to Pig: " + rubyResult, e);
+ }
+ }
+
+ /**
+ * This provides the Schema of the output, and leverages the get_output_schema function on the class object
+ * that is defined on the ruby side.
+ */
+ @Override
+ public Schema outputSchema(Schema input) {
+ if (!isInitialized)
+ initialize();
+ RubySchema rs = PigJrubyLibrary.pigToRuby(ruby, input);
+ return PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(classObject, "get_output_schema", rs, RubySchema.class));
+ }
+}
Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyAlgebraicEvalFunc.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.scripting.jruby.JrubyScriptEngine.RubyFunctions;
+
+import org.jruby.Ruby;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class provides the bridge between Ruby classes that extend the AlgebraicPigUdf
+ * "interface" by implementing an initial, intermed, and final method. Unlike EvalFuncs
+ * and Accumulators, the type must be known at compile time (ie it can't return Object),
+ * as Pig inspects the type and ensures that it is valid. This is why class specific
+ * shells are provided at the bottom. This class leverages AlgebraicEvalFunc to provide
+ * the Accumulator and EvalFunc implementations.
+ */
+public abstract class JrubyAlgebraicEvalFunc<T> extends AlgebraicEvalFunc<T> {
+ protected static BagFactory mBagFactory = BagFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ protected static final ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+ protected static final Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+ // It makes no sense to instantiate this without arguments
+ private JrubyAlgebraicEvalFunc() {}
+
+ /**
+ * The constructor takes information on the script and method being invoked and registers it with the
+ * superclass (which is necessary for AlgebraicEvalFunc).
+ */
+ public JrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName, functionName); }
+
+ /**
+ * This class invokes the initial method on the given Ruby class. As a courtesy to the user, it unwraps the
+ * DataBag that is given to Initial and passes Ruby just the first Tuple that it contains, as the contract
+ * for the Initial function in Algebraic is that it is given a DataBag with one and only one Tuple. Finally,
+ * it wraps the Ruby output in a Tuple.
+ */
+ public static class Initial extends AlgebraicFunctionWrapper<Tuple> {
+ public Initial() {}
+
+ public Initial(String fileName, String functionName) { super(fileName, functionName, "initial"); }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ if (!isInitialized())
+ initialize();
+
+ try {
+ IRubyObject inp = PigJrubyLibrary.pigToRuby(ruby, ((DataBag)input.get(0)).iterator().next().get(0));
+ IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+ return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
+ } catch (Exception e) {
+ throw new IOException("Error executing initial function", e);
+ }
+ }
+ }
+
+ /**
+ * This class invokes the intermed method on the given Ruby class. It passes along the DataBag contained
+ * in the Tuple it is given, and wraps the Ruby output in a Tuple.
+ */
+ public static class Intermed extends AlgebraicFunctionWrapper<Tuple> {
+ public Intermed() {}
+
+ public Intermed(String fileName, String functionName) { super(fileName, functionName, "intermed"); }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ if (!isInitialized())
+ initialize();
+
+ try {
+ RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
+ IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+ return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
+ } catch (Exception e) {
+ throw new IOException("Error executing intermediate function: ", e);
+ }
+ }
+ }
+
+ /**
+ * This class invokes the final method on the given Ruby class. It passes along the DataBag contained
+ * in the Tuple it is given, and the raw result.
+ */
+ public static class Final<T> extends AlgebraicFunctionWrapper<T> {
+ public Final() {}
+
+ public Final(String fileName, String functionName) { super(fileName, functionName, "final"); }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T exec(Tuple input) throws IOException {
+ if (!isInitialized())
+ initialize();
+
+ try {
+ RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
+ IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
+ return (T)PigJrubyLibrary.rubyToPig(rubyResult);
+ } catch (Exception e) {
+ throw new IOException("Error executing final function", e);
+ }
+ }
+ }
+
+ /**
+ * This is a lightweight wrapper shell that registers information on the method being called,
+ * and provides the initializer that the static Algebraic classes (Initial, Intermed, Final)
+ * will use to execute.
+ */
+ public static abstract class AlgebraicFunctionWrapper<T> extends EvalFunc<T> {
+ private String fileName;
+ private String functionName;
+
+ protected Object receiver;
+ protected boolean isInitialized = false;
+
+ protected String stage;
+
+ public String getStage() { return stage; }
+ public Object getReceiver() { return receiver; }
+ public String getFileName() { return fileName; }
+ public String getFunctionName() { return functionName; }
+
+ public AlgebraicFunctionWrapper() {}
+
+ /**
+ * In addition to registering the fileName and the functionName (which are given based on the
+ * arguments passed to super() in the containing class's constructor, each extending class
+ * will register their "stage," which will serve as the method to invoke on the Ruby class.
+ */
+ public AlgebraicFunctionWrapper(String fileName, String functionName, String stage) {
+ this.fileName = fileName;
+ this.functionName = functionName;
+ this.stage = stage;
+ }
+
+ public boolean isInitialized() { return isInitialized; }
+
+ public void initialize() {
+ receiver = rubyEngine.callMethod(RubyFunctions.getFunctions("algebraic", fileName).get(functionName), "new");
+ isInitialized = true;
+ }
+
+ @Override
+ public abstract T exec(Tuple input) throws IOException;
+ }
+
+ @Override
+ public abstract String getFinal();
+
+ @Override
+ public String getInitial() { return Initial.class.getName(); }
+
+ @Override
+ public String getIntermed() { return Intermed.class.getName(); }
+
+ /**
+ * Unlike EvalFuncs and Accumulators, the type must be known at compile time (ie it
+ * can't return Object), as Pig inspects the type and ensures that it is valid. This
+ * is why class specific shells are provided here. This is also the reason why the
+ * Ruby Algebraic interface is the only interface that does not currently allow overriding
+ * outputSchema, and a fixed one must be provided.
+ */
+ public static class BagJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataBag> {
+ public BagJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<DataBag> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class ChararrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<String> {
+ public ChararrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<String> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class DataByteArrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataByteArray> {
+ public DataByteArrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<DataByteArray> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class DoubleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Double> {
+ public DoubleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Double> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class FloatJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Float> {
+ public FloatJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Float> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class IntegerJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Integer> {
+ public IntegerJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Integer> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class LongJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Long> {
+
+ public LongJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Long> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class MapJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Map<?,?>> {
+ public MapJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Map<?,?>> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+
+ public static class TupleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Tuple> {
+ public TupleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
+
+ @Override
+ public String getFinal() { return Final.class.getName(); }
+
+ public static class Final extends JrubyAlgebraicEvalFunc.Final<Tuple> {
+ public Final() {}
+ public Final(String fileName, String functionName) { super(fileName,functionName); }
+ }
+ }
+}
Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyEvalFunc.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.backend.executionengine.ExecException;
+
+import org.jruby.Ruby;
+import org.jruby.RubyArray;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.runtime.builtin.IRubyObject;
+
+/**
+ * This class serves at the bridge between Ruby methods that
+ * are registered with and extend PigUdf, and their execution in
+ * Pig. An instance of the containing class is created, and their
+ * method name will be called against that instance. If they have
+ * a schema function associated, then when outputSchema is called,
+ * that function will be given the input Schema and the output will
+ * be given to Pig.
+ */
+public class JrubyEvalFunc extends EvalFunc<Object> {
+
+ private String methodName;
+ private long numRequiredArgs;
+ private long numOptionalArgs;
+
+ private String fileName;
+ private String functionName;
+
+ private boolean isInitialized = false;
+
+ private String schemaPiece;
+ private Object funcReceiver;
+ private Object funcInfoEncapsulator;
+
+ private static ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
+ private static Ruby ruby = rubyEngine.getProvider().getRuntime();
+
+ // Instantiating this class without arguments is meaningless
+ private JrubyEvalFunc() {}
+
+ /**
+ * The constructor for this class registers the filename of the Ruby script
+ * and the name of the method we care about. The difference between function name
+ * and method name is that the former is the name that was used to register the function,
+ * whereas the method name is the actual method that must be invoked. The two are often
+ * the same, but not always. functionName is the key to the list of EvalFuncs that
+ * are registered.
+ */
+ public JrubyEvalFunc(String filename, String functionName) throws IOException {
+ this.fileName = filename;
+ this.functionName = functionName;
+ }
+
+ /**
+ * This method initializes the objects necessary to evaluate the Ruby class on the pig side. Using
+ * the object that was saved to the functionName key by Ruby, this class gets an instance of the
+ * class that will receive method calls, as well as information on the arity.
+ */
+ private void initialize() {
+ funcInfoEncapsulator = JrubyScriptEngine.RubyFunctions.getFunctions("evalfunc", fileName).get(functionName);
+
+ funcReceiver = rubyEngine.callMethod(funcInfoEncapsulator, "get_receiver");
+ methodName = rubyEngine.callMethod(funcInfoEncapsulator, "method_name", String.class);
+ numRequiredArgs = rubyEngine.callMethod(funcInfoEncapsulator, "required_args", Long.class);
+ numOptionalArgs = rubyEngine.callMethod(funcInfoEncapsulator, "optional_args", Long.class); //TODO support varargs?
+
+ isInitialized = true;
+ }
+
+ /**
+ * The exec method passes the tuple argument to the Ruby function, and converts the result back to Pig.
+ */
+ @Override
+ public Object exec(Tuple tuple) throws IOException {
+ if (!isInitialized)
+ initialize();
+
+ try {
+ IRubyObject rubyResult = null;
+ if (tuple == null || (numRequiredArgs == 0 && numOptionalArgs == 0)) {
+ rubyResult = rubyEngine.callMethod(funcReceiver, methodName, IRubyObject.class);
+ } else {
+ Object[] args = PigJrubyLibrary.pigToRuby(ruby, tuple).toArray();
+ if (args.length >= numRequiredArgs && (numOptionalArgs == -1 || args.length <= numRequiredArgs + numOptionalArgs)) {
+ rubyResult = rubyEngine.callMethod(funcReceiver, methodName, args, IRubyObject.class);
+ } else {
+ String s = "Method " + methodName + " requires " + numRequiredArgs + " arguments and ";
+ s += ( numOptionalArgs == -1 ? "unlimitated " : numOptionalArgs ) +" optional arguments. ";
+ s += "Instead, " + args.length + " arguments given.";
+ throw new RuntimeException(s);
+ }
+ }
+ return PigJrubyLibrary.rubyToPig(rubyResult);
+ } catch (Exception e) {
+ throw new IOException("Error executing function", e);
+ }
+ }
+
+ /**
+ * This method uses the schema method of the function encapsulation object to get the Schema information for
+ * the Ruby method.
+ */
+ @Override
+ public Schema outputSchema(Schema input) {
+ if (!isInitialized)
+ initialize();
+ RubySchema rs = PigJrubyLibrary.pigToRuby(ruby, input);
+ return PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(funcInfoEncapsulator, "schema", new Object[]{rs, funcReceiver}, RubySchema.class));
+ }
+}
Added: pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java?rev=1308736&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java Tue Apr 3 07:56:01 2012
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jruby;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+
+import org.jruby.Ruby;
+import org.jruby.RubyArray;
+import org.jruby.RubyBoolean;
+import org.jruby.embed.LocalContextScope;
+import org.jruby.embed.LocalVariableBehavior;
+import org.jruby.embed.ScriptingContainer;
+import org.jruby.javasupport.JavaEmbedUtils.EvalUnit;
+import org.jruby.CompatVersion;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of the script engine for Jruby, which facilitates the registration
+ * of scripts as UDFs, and also provides information (via the nested class RubyFunctions)
+ * on the registered functions.
+ */
+public class JrubyScriptEngine extends ScriptEngine {
+ private static final Log LOG = LogFactory.getLog(JrubyScriptEngine.class);
+
+ //TODO test if it is necessary to have a per script (or even per method) runtime. PRO: avoid collisions CON: a bunch of runtimes, which could be slow
+ protected static final ScriptingContainer rubyEngine;
+
+ private boolean isInitialized = false;
+
+ static {
+ rubyEngine = new ScriptingContainer(LocalContextScope.SINGLETHREAD, LocalVariableBehavior.PERSISTENT);
+ rubyEngine.setCompatVersion(CompatVersion.RUBY1_9);
+ }
+
+ /**
+ * This is a static class which provides functionality around the functions that are registered with Pig.
+ */
+ static class RubyFunctions {
+ /**
+ * This cache maps function type to a map that maps path to a map of function name to the object
+ * which contains information about that function. In the case of an EvalFunc, there is a special
+ * function which encapsulates information about the function. In the case of an Accumulator or
+ * Algebraic, it is just an instance of the Class object that extends AccumulatorPigUdf or
+ * AlgebraicPigUdf, respectively.
+ */
+ private static Map<String, Map<String, Map<String, Object>>> functionsCache = Maps.newHashMap();
+
+ private static Map<String, Boolean> alreadyRunCache = Maps.newHashMap();
+
+ private static Map<String, String> cacheFunction = Maps.newHashMap();
+
+ static {
+ //TODO use an enum instead?
+ cacheFunction.put("evalfunc", "PigUdf.get_functions_to_register");
+ cacheFunction.put("accumulator", "AccumulatorPigUdf.classes_to_register");
+ cacheFunction.put("algebraic", "AlgebraicPigUdf.classes_to_register");
+
+ functionsCache.put("evalfunc", new HashMap<String,Map<String,Object>>());
+ functionsCache.put("accumulator", new HashMap<String,Map<String,Object>>());
+ functionsCache.put("algebraic", new HashMap<String,Map<String,Object>>());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String,Object> getFromCache(String path, Map<String,Map<String,Object>> cacheToUpdate, String regCommand) {
+ Boolean runCheck = alreadyRunCache.get(path);
+ if (runCheck == null || !runCheck.booleanValue()) {
+ for (Map.Entry<String, Map<String, Map<String, Object>>> entry : functionsCache.entrySet())
+ entry.getValue().remove(path);
+
+ rubyEngine.runScriptlet(getScriptAsStream(path), path);
+
+ alreadyRunCache.put(path, true);
+ }
+
+ Map<String,Object> funcMap = cacheToUpdate.get(path);
+
+ if (funcMap == null) {
+ funcMap = (Map<String,Object>)rubyEngine.runScriptlet(regCommand);
+ cacheToUpdate.put(path, funcMap);
+ }
+
+ return funcMap;
+ }
+
+ public static Map<String, Object> getFunctions(String cache, String path) {
+ return getFromCache(path, functionsCache.get(cache), cacheFunction.get(cache));
+ }
+ }
+
+ /**
+ * Evaluates the script containing ruby udfs to determine what udfs are defined as well as
+ * what libaries and other external resources are necessary. These libraries and resources
+ * are then packaged with the job jar itself.
+ */
+ @Override
+ public void registerFunctions(String path, String namespace, PigContext pigContext) throws IOException {
+ if (!isInitialized) {
+ pigContext.scriptJars.add(getJarPath(Ruby.class));
+ pigContext.addScriptFile("pigudf.rb", "pigudf.rb");
+ isInitialized = true;
+ }
+
+ for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("evalfunc", path).entrySet()) {
+ String method = entry.getKey();
+
+ String functionType = rubyEngine.callMethod(entry.getValue(), "name", String.class);
+
+ FuncSpec funcspec = new FuncSpec(JrubyEvalFunc.class.getCanonicalName() + "('" + path + "','" + method +"')");
+ pigContext.registerFunction(namespace + "." + method, funcspec);
+ }
+
+ for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("accumulator", path).entrySet()) {
+ String method = entry.getKey();
+
+ if (rubyEngine.callMethod(entry.getValue(), "check_if_necessary_methods_present", RubyBoolean.class).isFalse())
+ throw new RuntimeException("Method " + method + " does not have all of the required methods present!");
+
+ pigContext.registerFunction(namespace + "." + method, new FuncSpec(JrubyAccumulatorEvalFunc.class.getCanonicalName() + "('" + path + "','" + method +"')"));
+ }
+
+ for (Map.Entry<String,Object> entry : RubyFunctions.getFunctions("algebraic", path).entrySet()) {
+ String method = entry.getKey();
+
+ if (rubyEngine.callMethod(entry.getValue(), "check_if_necessary_methods_present", RubyBoolean.class).isFalse())
+ throw new RuntimeException("Method " + method + " does not have all of the required methods present!");
+
+ Schema schema = PigJrubyLibrary.rubyToPig(rubyEngine.callMethod(entry.getValue(), "get_output_schema", RubySchema.class));
+ String canonicalName = JrubyAlgebraicEvalFunc.class.getCanonicalName() + "$";
+
+ // In the case of an Algebraic UDF, a type specific EvalFunc is necessary (ie not EvalFunc<Object>), so we
+ // inspect the type and instantiated the proper class.
+ switch (schema.getField(0).type) {
+ case DataType.BAG: canonicalName += "Bag"; break;
+ case DataType.TUPLE: canonicalName += "Tuple"; break;
+ case DataType.CHARARRAY: canonicalName += "Chararray"; break;
+ case DataType.DOUBLE: canonicalName += "Double"; break;
+ case DataType.FLOAT: canonicalName += "Float"; break;
+ case DataType.INTEGER: canonicalName += "Integer"; break;
+ case DataType.LONG: canonicalName += "Long"; break;
+ case DataType.MAP: canonicalName += "Map"; break;
+ case DataType.BYTEARRAY: canonicalName += "DataByteArray"; break;
+ default: throw new ExecException("Unable to instantiate Algebraic EvalFunc " + method + " as schema type is invalid");
+ }
+
+ canonicalName += "JrubyAlgebraicEvalFunc";
+
+ pigContext.registerFunction(namespace + "." + method, new FuncSpec(canonicalName + "('" + path + "','" + method +"')"));
+ }
+
+ // Determine what external dependencies to ship with the job jar
+ HashSet<String> toShip = libsToShip();
+ for (String lib : toShip) {
+ File libFile = new File(lib);
+ if (lib.endsWith(".rb")) {
+ pigContext.addScriptFile(lib);
+ } else if (libFile.isDirectory()) {
+ //
+ // Need to package entire contents of directory
+ //
+ List<File> files = listRecursively(libFile);
+ for (File file : files) {
+ if (file.isDirectory()) {
+ continue;
+ } else if (file.getName().endsWith(".jar") || file.getName().endsWith(".zip")) {
+ pigContext.scriptJars.add(file.getPath());
+ } else {
+ String localPath = libFile.getName() + file.getPath().replaceFirst(libFile.getPath(), "");
+ pigContext.addScriptFile(localPath, file.getPath());
+ }
+ }
+ } else {
+ pigContext.scriptJars.add(lib);
+ }
+ }
+ }
+ /**
+ * Consults the scripting container, after the script has been evaluated, to
+ * determine what dependencies to ship.
+ * <p>
+ * FIXME: Corner cases like the following: "def foobar; require 'json'; end"
+ * are NOT dealt with using this method
+ */
+ private HashSet<String> libsToShip() {
+ RubyArray loadedLibs = (RubyArray)rubyEngine.get("$\"");
+ RubyArray loadPaths = (RubyArray)rubyEngine.get("$LOAD_PATH");
+ // Current directory first
+ loadPaths.add(0, "");
+
+ HashSet<String> toShip = new HashSet<String>();
+ HashSet<Object> shippedLib = new HashSet<Object>();
+
+ for (Object loadPath : loadPaths) {
+ for (Object lib : loadedLibs) {
+ if (lib.toString().equals("pigudf.rb"))
+ continue;
+ if (shippedLib.contains(lib))
+ continue;
+ String possiblePath = (loadPath.toString().isEmpty()?"":loadPath.toString() +
+ File.separator) + lib.toString();
+ if ((new File(possiblePath)).exists()) {
+ // remove prefix ./
+ toShip.add(possiblePath.startsWith("./")?possiblePath.substring(2):
+ possiblePath);
+ shippedLib.add(lib);
+ }
+ }
+ }
+ return toShip;
+ }
+
+ private static List<File> listRecursively(File directory) {
+ File[] entries = directory.listFiles();
+
+ ArrayList<File> files = new ArrayList<File>();
+
+ // Go over entries
+ for (File entry : entries) {
+ files.add(entry);
+ if (entry.isDirectory()) {
+ files.addAll(listRecursively(entry));
+ }
+ }
+ return files;
+ }
+
+ @Override
+ protected Map<String, List<PigStats>> main(PigContext pigContext, String scriptFile) throws IOException {
+ throw new UnsupportedOperationException("Unimplemented");
+ }
+
+ @Override
+ protected String getScriptingLang() {
+ return "jruby";
+ }
+
+ @Override
+ protected Map<String, Object> getParamsFromVariables() throws IOException {
+ Map<String, Object> vars = Maps.newHashMap();
+ return vars;
+ }
+}