You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/07/16 20:56:10 UTC

svn commit: r1362192 [1/2] - in /pig/trunk: ./ ivy/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/groovy/ test/ test/org/apache/pig/test/

Author: julien
Date: Mon Jul 16 18:56:10 2012
New Revision: 1362192

URL: http://svn.apache.org/viewvc?rev=1362192&view=rev
Log:
PIG-2763: Groovy UDFs (herberts via julien)

Added:
    pig/trunk/src/org/apache/pig/scripting/groovy/
    pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java
    pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java
    pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java
    pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java
    pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java
    pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
    pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java
    pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java
    pig/trunk/test/org/apache/pig/test/TestUDFGroovy.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
    pig/trunk/test/unit-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 16 18:56:10 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2763: Groovy UDFs (herberts via julien)
+
 PIG-2780: MapReduceLauncher should break early when one of the jobs throws an exception (jay23jack via daijy)
 
 PIG-2804: Remove "PIG" exec type (dvryaboy)

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Jul 16 18:56:10 2012
@@ -184,6 +184,8 @@
       conf="compile->master"/>
     <dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}"
       conf="releaseaudit->default"/>
+    <dependency org="org.codehaus.groovy" name="groovy-all" rev="${groovy.version}"
+      conf="compile->master"/>
     <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"
       conf="compile->master"/>
     <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}"

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Jul 16 18:56:10 2012
@@ -31,6 +31,7 @@ jersey.version=1.8
 checkstyle.version=4.2
 ivy.version=2.2.0
 jasper.version=6.1.14
+groovy.version=1.8.6
 guava.version=11.0
 jersey-core.version=1.8
 hadoop-core.version=1.0.0

Modified: pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Jul 16 18:56:10 2012
@@ -43,14 +43,15 @@ import org.apache.pig.tools.pigstats.Pig
  * Base class for various scripting implementations
  */
 public abstract class ScriptEngine {
-    
+
     public static enum SupportedScriptLang {
 
         // possibly jruby in the future
         jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
-        jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"), 
-        javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine");
-        
+        jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
+        javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
+        groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine");
+
         private static Set<String> supportedScriptLangs;
         static {
             supportedScriptLangs = new HashSet<String>();

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'accumulate' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST accept a single groovy.lang.Tuple as parameter.
+ * Its return value, if any, is ignored.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorAccumulate {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'cleanup' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST NOT accept any parameters.
+ * Its return value, if any, is ignored.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorCleanup {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'getValue' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST NOT accept any parameters.
+ * Its return value will be that of the Accumulator UDF.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorGetValue {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Final' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT accept any parameters.
+ * Its return value will be that of the Algebraic UDF.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicFinal {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Initial' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST accept a single groovy.lang.Tuple parameter,
+ * and MUST return a groovy.lang.Tuple or Object[].
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicInitial {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Intermed' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST accept a single groovy.lang.Tuple parameter,
+ * and MUST return a groovy.lang.Tuple or Object[].
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicIntermed {
+  String value();
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class GroovyAccumulatorEvalFunc extends AccumulatorEvalFunc<Object> {
+
+  private GroovyEvalFuncObject groovyAccumulate;
+  private GroovyEvalFuncObject groovyGetValue;
+  private GroovyEvalFuncObject groovyCleanup;
+
+  public GroovyAccumulatorEvalFunc(String path, String namespace, String accumulatorMethod, String accumulateMethod,
+      String getValueMethod, String cleanupMethod) throws IOException {
+    //
+    // Use the same invocation target for accumulate/getValue/cleanup
+    //
+    this.groovyAccumulate = new GroovyEvalFuncObject(path, namespace, accumulateMethod);
+    this.groovyGetValue = new GroovyEvalFuncObject(path, namespace, getValueMethod, this.groovyAccumulate.getInvocationTarget());
+    this.groovyCleanup = new GroovyEvalFuncObject(path, namespace, cleanupMethod, this.groovyAccumulate.getInvocationTarget());
+  }
+
+  @Override
+  public void accumulate(Tuple b) throws IOException {
+    this.groovyAccumulate.exec(b);
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      this.groovyCleanup.exec(null);
+    } catch (IOException ioe) {
+    }
+  }
+
+  @Override
+  public Object getValue() {
+    try {
+      return this.groovyGetValue.exec(null);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    return this.groovyGetValue.outputSchema(input);
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+public abstract class GroovyAlgebraicEvalFunc<T> extends AlgebraicEvalFunc<T> {
+
+  public GroovyAlgebraicEvalFunc() {
+  }
+
+  public GroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+      String finalMethod) {
+    super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+  }
+
+  @Override
+  public abstract String getFinal();
+
+  @Override
+  public String getInitial() {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return Intermed.class.getName();
+  }
+
+  public static abstract class AlgebraicFunctionWrapper<T> extends GroovyEvalFunc<T> {
+    public AlgebraicFunctionWrapper() {
+    }
+
+    public AlgebraicFunctionWrapper(String path, String namespace, String methodName) throws IOException {
+      super(path, namespace, methodName);
+    }
+  }
+
+  public static class Initial extends AlgebraicFunctionWrapper<Tuple> {
+    public Initial() {
+    }
+
+    public Initial(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+        String finalMethod) throws IOException {
+      super(path, namespace, initialMethod);
+    }
+  }
+
+  public static class Intermed extends AlgebraicFunctionWrapper<Tuple> {
+    public Intermed() {
+    }
+
+    public Intermed(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+        String finalMethod) throws IOException {
+      super(path, namespace, intermedMethod);
+    }
+  }
+
+  public static class Final<T> extends AlgebraicFunctionWrapper<T> {
+    public Final() {
+    }
+
+    public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+        String finalMethod) throws IOException {
+      super(path, namespace, finalMethod);
+    }
+  }
+
+  /**
+   * Unlike EvalFuncs and Accumulators, the type must be known at compile time
+   * (ie it
+   * can't return Object), as Pig inspects the type and ensures that it is
+   * valid. This
+   * is why class specific shells are provided here.
+   */
+  public static class DataBagGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<DataBag> {
+    public DataBagGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<DataBag> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class TupleGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Tuple> {
+    public TupleGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Tuple> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class ChararrayGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<String> {
+    public ChararrayGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<String> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class DataByteArrayGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<DataByteArray> {
+    public DataByteArrayGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<DataByteArray> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class DoubleGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Double> {
+    public DoubleGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Double> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class FloatGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Float> {
+    public FloatGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Float> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class IntegerGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Integer> {
+    public IntegerGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Integer> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class LongGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Long> {
+    public LongGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Long> {
+      public Final() {
+      }
+
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class MapGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Map<String, ?>> {
+    public MapGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Map<String, ?>> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+
+  public static class BooleanGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Boolean> {
+    public BooleanGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+        String intermedMethod, String finalMethod) throws IOException {
+      super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+    }
+
+    @Override
+    public String getFinal() {
+      return Final.class.getName();
+    }
+
+    public static class Final extends GroovyAlgebraicEvalFunc.Final<Boolean> {
+      public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+          String finalMethod) throws IOException {
+        super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+      }
+    }
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import groovy.util.ResourceException;
+import groovy.util.ScriptException;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.OutputSchema;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.scripting.ScriptEngine;
+
+public class GroovyEvalFunc<T> extends EvalFunc<T> {
+
+  private Schema schema = null;
+
+  private GroovyEvalFunc schemaFunction = null;
+
+  protected Method method = null;
+
+  private static Map<String, Class> scriptClasses = new ConcurrentHashMap<String, Class>();
+
+  private Object invocationTarget;
+
+  public GroovyEvalFunc() {
+  }
+
+  public GroovyEvalFunc(String path, String namespace, String methodName) throws IOException {
+    this(path, namespace, methodName, null);
+  }
+
+  public GroovyEvalFunc(String path, String namespace, String methodName, Object target) throws IOException {
+    String fqmn = "".equals(namespace) ? methodName : namespace + ScriptEngine.NAMESPACE_SEPARATOR + methodName;
+
+    Class c = scriptClasses.get(path);
+
+    if (null == c) {
+      try {
+        c = GroovyScriptEngine.getEngine().loadScriptByName(path);
+      } catch (ScriptException se) {
+        throw new IOException(se);
+      } catch (ResourceException re) {
+        throw new IOException(re);
+      }
+    }
+
+    scriptClasses.put(path, c);
+
+    Method[] methods = c.getMethods();
+
+    int matches = 0;
+
+    for (Method m : methods) {
+      if (m.getName().equals(methodName)) {
+        this.method = m;
+        matches++;
+      }
+    }
+
+    if (null == this.method) {
+      throw new IOException("Method " + methodName + " was not found in '" + path + "'");
+    }
+
+    if (matches > 1) {
+      throw new IOException("There are " + matches + " methods with name '" + methodName + "', please make sure method names are unique within the Groovy class.");
+    }
+
+    //
+    // Extract schema
+    //
+
+    Annotation[] annotations = this.method.getAnnotations();
+
+    for (Annotation annotation : annotations) {
+      if (annotation.annotationType().equals(OutputSchemaFunction.class)) {
+        this.schemaFunction = new GroovyEvalFuncObject(path, namespace, ((OutputSchemaFunction) annotation).value());
+        break;
+      } else if (annotation.annotationType().equals(OutputSchema.class)) {
+        this.schema = Utils.getSchemaFromString(((OutputSchema) annotation).value());
+        break;
+      }
+    }
+
+    //
+    // For static method, invocation target is null, for non
+    // static method, create/set invocation target unless passed
+    // to the constructor
+    //
+
+    if (!Modifier.isStatic(this.method.getModifiers())) {
+      if (null != target) {
+        this.invocationTarget = target;
+      } else {
+        try {
+          this.invocationTarget = c.newInstance();
+        } catch (InstantiationException ie) {
+          throw new IOException(ie);
+        } catch (IllegalAccessException iae) {
+          throw new IOException(iae);
+        }
+      }
+    }
+  }
+
+  @Override
+  public T exec(Tuple input) throws IOException {
+
+    Object[] args = new Object[null != input ? input.size() : 0];
+
+    for (int i = 0; i < args.length; i++) {
+      args[i] = GroovyUtils.pigToGroovy(input.get(i));
+    }
+
+    try {
+      if (this.method.getReturnType().equals(Void.TYPE)) {
+        //
+        // Invoke method but return null if method is 'void',
+        // this is done so we can wrap 'accumulate' and 'cleanup' methods too.
+        //
+        this.method.invoke(this.invocationTarget, args);
+        return null;
+      } else {
+        return (T) GroovyUtils.groovyToPig(this.method.invoke(this.invocationTarget, args));
+      }
+    } catch (InvocationTargetException ite) {
+      throw new IOException(ite);
+    } catch (IllegalAccessException iae) {
+      throw new IOException(iae);
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    if (null != this.schemaFunction) {
+      try {
+        Tuple t = TupleFactory.getInstance().newTuple(1);
+        // Strip enclosing '{}' from schema
+        t.set(0, input.toString().replaceAll("^\\{", "").replaceAll("\\}$", ""));
+        return Utils.getSchemaFromString((String) this.schemaFunction.exec(t));
+      } catch (ParserException pe) {
+        throw new RuntimeException(pe);
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+    } else {
+      return this.schema;
+    }
+  }
+
+  public Object getInvocationTarget() {
+    return this.invocationTarget;
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+
+public class GroovyEvalFuncObject extends GroovyEvalFunc<Object> {
+  public GroovyEvalFuncObject() {
+    super();
+  }
+
+  public GroovyEvalFuncObject(String path, String namespace, String method) throws IOException {
+    super(path, namespace, method);
+  }
+
+  public GroovyEvalFuncObject(String path, String namespace, String method, Object target) throws IOException {
+    super(path, namespace, method, target);
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import groovy.lang.Tuple;
+import groovy.util.ResourceException;
+import groovy.util.ScriptException;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.OutputSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.BooleanGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.ChararrayGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DataBagGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DataByteArrayGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DoubleGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.FloatGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.IntegerGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.LongGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.MapGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.TupleGroovyAlgebraicEvalFunc;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class GroovyScriptEngine extends ScriptEngine {
+
+  private static final Log LOG = LogFactory.getLog(GroovyScriptEngine.class);
+
+  private static groovy.util.GroovyScriptEngine gse;
+
+  private static boolean isInitialized = false;
+
+  static {
+    try {
+      gse = new groovy.util.GroovyScriptEngine("");
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  protected Map<String, List<PigStats>> main(PigContext context, String scriptFile) throws IOException {
+
+    PigServer pigServer = new PigServer(context, false);
+
+    //
+    // Register dependencies
+    //
+
+    String groovyJar = getJarPath(groovy.util.GroovyScriptEngine.class);
+
+    if (null != groovyJar) {
+        pigServer.registerJar(groovyJar);
+    }
+
+    //
+    // Register UDFs
+    //
+
+    registerFunctions(scriptFile, null, context);
+
+    try {
+
+      //
+      // Load the script
+      //
+
+      Class c = gse.loadScriptByName(scriptFile);
+
+      //
+      // Extract the main method
+      //
+
+      Method main = c.getMethod("main", String[].class);
+
+      if (null == main || !Modifier.isStatic(main.getModifiers()) || !Modifier.isPublic(main.getModifiers()) || !Void.TYPE.equals(main.getReturnType())) {
+        throw new IOException("No method 'public static void main(String[] args)' was found.");
+      }
+
+      //
+      // Invoke the main method
+      //
+
+      Object[] args = new Object[1];
+      String[] argv = (String[])ObjectSerializer.deserialize(context.getProperties().getProperty(PigContext.PIG_CMD_ARGS_REMAINDERS));
+      args[0] = argv;
+
+      main.invoke(null, args);
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    return getPigStatsMap();
+  }
+
+  @Override
+  public void registerFunctions(String path, String namespace, PigContext pigContext) throws IOException {
+
+    if (!isInitialized) {
+      pigContext.scriptJars.add(getJarPath(groovy.util.GroovyScriptEngine.class));
+      isInitialized = true;
+    }
+
+    try {
+      //
+      // Read file
+      //
+
+      Class c = gse.loadScriptByName(path);
+
+      //
+      // Keep track of initial/intermed/final methods of Albegraic UDFs
+      //
+
+      Map<String, Method[]> algebraicMethods = new HashMap<String, Method[]>();
+
+      //
+      // Keep track of accumulate/getValue/cleanup methods of Accumulator UDFs
+      //
+
+      Map<String, Method[]> accumulatorMethods = new HashMap<String, Method[]>();
+
+      //
+      // Loop over the methods
+      //
+
+      Method[] methods = c.getMethods();
+
+      for (Method method : methods) {
+        Annotation[] annotations = method.getAnnotations();
+
+        boolean isAccumulator = false;
+
+        if (annotations.length > 0) {
+          Schema schema = null;
+          String schemaFunction = null;
+
+          for (Annotation annotation : annotations) {
+            if (annotation.annotationType().equals(OutputSchema.class)) {
+              schema = Utils.getSchemaFromString(((OutputSchema) annotation).value());
+            } else if (annotation.annotationType().equals(OutputSchemaFunction.class)) {
+              schemaFunction = ((OutputSchemaFunction) annotation).value();
+            } else if (isAlgebraic(annotation)) {
+
+              String algebraic = null;
+
+              int idx = 0;
+
+              if (annotation.annotationType().equals(AlgebraicInitial.class)) {
+                //
+                // Check that method accepts a single Tuple as parameter.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (1 != params.length || !Tuple.class.equals(params[0])) {
+                  throw new IOException(path + ": methods annotated with @AlgebraicInitial MUST take a single groovy.lang.Tuple as parameter.");
+                }
+                if (!method.getReturnType().equals(Tuple.class) && !method.getReturnType().equals(Object[].class)) {
+                  throw new IOException(path + ":" + method.getName()
+                      + " Algebraic UDF Initial method MUST return type groovy.lang.Tuple or Object[].");
+                }
+                algebraic = ((AlgebraicInitial) annotation).value();
+                idx = 0;
+              } else if (annotation.annotationType().equals(AlgebraicIntermed.class)) {
+                //
+                // Check that method accepts a single Tuple as parameter.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (1 != params.length || !Tuple.class.equals(params[0])) {
+                  throw new IOException(path + ": methods annotated with @AlgebraicIntermed MUST take a single groovy.lang.Tuple as parameter.");
+                }
+                if (!method.getReturnType().equals(Tuple.class) && !method.getReturnType().equals(Object[].class)) {
+                  throw new IOException(path + ":" + method.getName()
+                      + " Algebraic UDF Intermed method MUST return type groovy.lang.Tuple or Object[].");
+                }
+                algebraic = ((AlgebraicIntermed) annotation).value();
+                idx = 1;
+              } else {
+                //
+                // Check that method accepts a single Tuple as parameter.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (1 != params.length || !Tuple.class.equals(params[0])) {
+                  throw new IOException(path + ": methods annotated with @AlgebraicFinal MUST take a single groovy.lang.Tuple as parameter.");
+                }
+                algebraic = ((AlgebraicFinal) annotation).value();
+                idx = 2;
+              }
+
+              Method[] algmethods = algebraicMethods.get(algebraic);
+
+              if (null == algmethods) {
+                algmethods = new Method[3];
+                algebraicMethods.put(algebraic, algmethods);
+              }
+
+              if (null != algmethods[idx]) {
+                throw new IOException(path + ": Algebraic UDF '" + algebraic + "' already has an "
+                    + annotation.annotationType().getSimpleName() + " method defined ('" + algmethods[idx] + "')");
+              }
+
+              algmethods[idx] = method;
+            } else if (isAccumulator(annotation)) {
+              String accumulator = null;
+
+              int idx = 0;
+
+              if (annotation.annotationType().equals(AccumulatorAccumulate.class)) {
+                //
+                // Check that method accepts a single Tuple as parameter.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (1 != params.length || !Tuple.class.equals(params[0])) {
+                  throw new IOException(path + ": methods annotated with @AccumulatorAccumulate MUST take a single groovy.lang.Tuple as parameter.");
+                }
+                accumulator = ((AccumulatorAccumulate) annotation).value();
+                idx = 0;
+              } else if (annotation.annotationType().equals(AccumulatorGetValue.class)) {
+                //
+                // Check that method does not accept any parameters.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (0 != params.length) {
+                  throw new IOException(path + ": methods annotated with @AccumulatorGetValue take no parameters.");
+                }
+                accumulator = ((AccumulatorGetValue) annotation).value();
+                isAccumulator = true;
+                idx = 1;
+              } else if (annotation.annotationType().equals(AccumulatorCleanup.class)) {
+                //
+                // Check that method does not accept any parameters.
+                //
+                Class<?>[] params = method.getParameterTypes();
+                if (0 != params.length) {
+                  throw new IOException(path + ": methods annotated with @AccumulatorCleanup take no parameters and return void.");
+                }
+                accumulator = ((AccumulatorCleanup) annotation).value();
+                idx = 2;
+              }
+
+              Method[] accumethods = accumulatorMethods.get(accumulator);
+
+              if (null == accumethods) {
+                accumethods = new Method[3];
+                accumulatorMethods.put(accumulator, accumethods);
+              }
+
+              if (null != accumethods[idx]) {
+                throw new IOException(path + ": Accumulator UDF '" + accumulator + "' already has an "
+                    + annotation.annotationType().getSimpleName() + " method defined ('" + accumethods[idx] + "')");
+              }
+
+              accumethods[idx] = method;
+            }
+          }
+
+          //
+          // Only register functions which have an output schema declared
+          //
+
+          if (null == schema && null == schemaFunction) {
+            LOG.info(path
+                + ": Only methods annotated with @OutputSchema or @OutputSchemaFunction (but not with @AccumulatorGetValue are exposed to Pig, skipping method '"
+                + method.getName() + "'");
+            continue;
+          }
+
+          //
+          // Only one of OutputSchema / OutputSchemaFunction can be defined
+          //
+
+          if (null != schema && null != schemaFunction) {
+            LOG.info("Annotation @OutputSchemaFunction has precedence over @OutputSchema for method '" + method.getName() + "'");
+          }
+
+          //
+          // Register methods annotated with 'OutputSchema' or
+          // 'OutputSchemaFunction, unless they are accumulators' getValue
+          // methods
+          //
+
+          if (!isAccumulator) {
+            namespace = (namespace == null) ? "" : namespace;
+            FuncSpec spec = new FuncSpec(GroovyEvalFuncObject.class.getCanonicalName() + "('" + path + "','" + namespace
+                + "','" + method.getName() + "')");
+            pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + method.getName(),
+                spec);
+            LOG.info(path + ": Register Groovy UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+                + method.getName());
+          }
+        }
+      }
+
+      //
+      // Now register algebraic methods
+      //
+
+      for (String algebraic : algebraicMethods.keySet()) {
+
+        Method[] algmethods = algebraicMethods.get(algebraic);
+
+        if (null == algmethods[0]) {
+          throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have an Initial method defined.");
+        } else if (null == algmethods[1]) {
+          throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have an Intermed method defined.");
+        } else if (null == algmethods[2]) {
+          throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have a Final method defined.");
+        }
+
+        //
+        // Retrieve schema of 'Final' method
+        //
+
+        String className = null;
+
+        Class<?> returnType = algmethods[2].getReturnType();
+
+        if (returnType.equals(Tuple.class) || returnType.equals(Object[].class)
+            || returnType.equals(org.apache.pig.data.Tuple.class)) {
+          className = TupleGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(List.class) || returnType.equals(DataBag.class)) {
+          className = DataBagGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Boolean.class) || returnType.equals(boolean.class)) {
+          className = BooleanGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(byte[].class) || returnType.equals(DataByteArray.class)) {
+          className = DataByteArrayGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(String.class)) {
+          className = ChararrayGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Double.class) || returnType.equals(double.class) || returnType.equals(BigDecimal.class)) {
+          className = DoubleGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Float.class) || returnType.equals(float.class)) {
+          className = FloatGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Byte.class) || returnType.equals(byte.class) || returnType.equals(Short.class)
+            || returnType.equals(short.class) || returnType.equals(Integer.class) || returnType.equals(int.class)) {
+          className = IntegerGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Long.class) || returnType.equals(long.class) || returnType.equals(BigInteger.class)) {
+          className = LongGroovyAlgebraicEvalFunc.class.getName();
+        } else if (returnType.equals(Map.class)) {
+          className = MapGroovyAlgebraicEvalFunc.class.getName();
+        } else {
+          throw new RuntimeException(path + ": Unknown return type for Algebraic UDF '" + algebraic + "'");
+        }
+
+        FuncSpec spec = new FuncSpec(className + "('" + path + "','" + namespace + "','" + algebraic + "','"
+            + algmethods[0].getName() + "','" + algmethods[1].getName() + "','" + algmethods[2].getName() + "')");
+        pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + algebraic, spec);
+
+        LOG.info("Register Groovy Algebraic UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+            + algebraic);
+      }
+
+      //
+      // Now register Accumulator UDFs
+      //
+
+      for (String accumulator : accumulatorMethods.keySet()) {
+
+        Method[] accumethods = accumulatorMethods.get(accumulator);
+
+        if (null == accumethods[0]) {
+          throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have an Accumulate method defined.");
+        } else if (null == accumethods[1]) {
+          throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have a GetValue method defined.");
+        } else if (null == accumethods[2]) {
+          throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have a Cleanup method defined.");
+        }
+
+        FuncSpec spec = new FuncSpec(GroovyAccumulatorEvalFunc.class.getName() + "('" + path + "','" + namespace + "','"
+            + accumulator + "','" + accumethods[0].getName() + "','" + accumethods[1].getName() + "','"
+            + accumethods[2].getName() + "')");
+        pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + accumulator, spec);
+
+        LOG.info("Register Groovy Accumulator UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+            + accumulator);
+      }
+    } catch (ResourceException re) {
+      throw new IOException(re);
+    } catch (ScriptException se) {
+      throw new IOException(se);
+    }
+
+  }
+
+  @Override
+  protected Map<String, Object> getParamsFromVariables() throws IOException {
+    return null;
+  }
+
+  @Override
+  protected String getScriptingLang() {
+    return "groovy";
+  }
+
+  protected static groovy.util.GroovyScriptEngine getEngine() {
+    return gse;
+  }
+
+  private static boolean isAlgebraic(Annotation annotation) {
+    return annotation.annotationType().equals(AlgebraicInitial.class)
+           || annotation.annotationType().equals(AlgebraicIntermed.class)
+           || annotation.annotationType().equals(AlgebraicFinal.class);
+  }
+
+  private static boolean isAccumulator(Annotation annotation) {
+    return annotation.annotationType().equals(AccumulatorAccumulate.class)
+           || annotation.annotationType().equals(AccumulatorGetValue.class)
+           || annotation.annotationType().equals(AccumulatorCleanup.class);
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class GroovyUtils {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  /**
+   * Converts an object created on the Groovy side to its Pig counterpart.
+   *
+   * The conversions are as follow:
+   *
+   * Groovy Pig
+   * Object[] Tuple
+   * groovy.lang.Tuple Tuple
+   * org.apache.pig.data.Tuple Tuple
+   * org.apache.pig.data.DataBag DataBag
+   * java.util.Map Map
+   * java.util.List DataBag
+   * Byte/Short/Integer int
+   * Long/BigInteger long
+   * Float float
+   * Double/BigDecimal double
+   * String chararray
+   * byte[] DataByteArray (copy)
+   * Boolean boolean
+   * null null
+   *
+   * anything else raises an exception
+   *
+   * @param groovyObject
+   *          Groovy object to convert
+   * @return the Pig counterpart of groovyObject
+   *
+   * @throws ExecException
+   */
+  public static Object groovyToPig(Object groovyObject) throws ExecException {
+    Object pigObject = null;
+
+    if (groovyObject instanceof Object[] || groovyObject instanceof groovy.lang.Tuple) {
+      //
+      // Allocate a List<Object> that will be filled with converted
+      // objects and later passed to newTuple.
+      //
+
+      List<Object> pigObjects = new ArrayList<Object>();
+
+      //
+      // Convert each member of groovyObject
+      //
+
+      if (groovyObject instanceof Object[]) {
+        for (Object o : (Object[]) groovyObject) {
+          pigObjects.add(groovyToPig(o));
+        }
+      } else {
+        for (Object o : (Iterable) groovyObject) {
+          pigObjects.add(groovyToPig(o));
+        }
+      }
+
+      //
+      // Create the result Tuple
+      //
+
+      pigObject = tupleFactory.newTuple(pigObjects);
+    } else if (groovyObject instanceof Tuple || groovyObject instanceof DataBag) {
+      //
+      // Copy Pig Tuple/DataBag as is
+      // This enables the creation of instances of DataBag which do not fit in
+      // memory
+      //
+      // It is advised to wrap objects into a call to groovyToPig
+      // prior to adding them to a Tuple or DataBag
+      //
+
+      pigObject = groovyObject;
+    } else if (groovyObject instanceof Map) {
+      //
+      // Allocate a Map
+      //
+
+      Map<String, Object> pigMap = new HashMap<String, Object>();
+
+      //
+      // Iterate over Groovy Map, putting each entry into pigMap
+      //
+
+      for (Map.Entry<?, ?> entry : ((Map<?, ?>) groovyObject).entrySet()) {
+        pigMap.put(groovyToPig(entry.getKey()).toString(), groovyToPig(entry.getValue()));
+      }
+
+      pigObject = pigMap;
+    } else if (groovyObject instanceof List) {
+      //
+      // Allocate a DataBag
+      //
+
+      DataBag bag = bagFactory.newDefaultBag();
+
+      //
+      // Pig's bags can only contain tuples, so we cast the return value
+      // of groovyToPig to a Tuple, if it's not a tuple, a ClassCastException
+      // will
+      // be thrown.
+      //
+
+      for (Object o : (List) groovyObject) {
+        Object p = groovyToPig(o);
+
+        if (p instanceof Tuple || null == p) {
+          bag.add((Tuple) p);
+        } else {
+          // Wrap value in a Tuple if it's not already a tuple
+          bag.add(tupleFactory.newTuple(p));
+        }
+      }
+
+      pigObject = bag;
+    } else if (groovyObject instanceof Integer || groovyObject instanceof Long || groovyObject instanceof Float
+        || groovyObject instanceof Double) {
+      //
+      // Numeric types which have an equivalent in Pig are passed as is as they
+      // are immutable
+      //
+      pigObject = groovyObject;
+    } else if (groovyObject instanceof Byte || groovyObject instanceof Short) {
+      pigObject = ((Number) groovyObject).intValue();
+    } else if (groovyObject instanceof BigInteger) {
+      pigObject = ((Number) groovyObject).longValue();
+    } else if (groovyObject instanceof BigDecimal) {
+      pigObject = ((Number) groovyObject).doubleValue();
+    } else if (groovyObject instanceof byte[]) {
+      //
+      // Clone the byte array
+      //
+
+      byte[] b = new byte[((byte[]) groovyObject).length];
+      System.arraycopy((byte[]) groovyObject, 0, b, 0, b.length);
+
+      pigObject = new DataByteArray(b);
+    } else if (groovyObject instanceof String) {
+      //
+      // String is immutable, so pass it as is
+      //
+
+      pigObject = groovyObject;
+    } else if (groovyObject instanceof Boolean) {
+      pigObject = groovyObject;
+    } else if (null == groovyObject) {
+      pigObject = null;
+    } else {
+      throw new ExecException("Unable to cast " + groovyObject.getClass().getName() + " to a Pig datatype.");
+    }
+
+    return pigObject;
+  }
+
+  /**
+   * Converts an object created on the Pig side to its Groovy counterpart.
+   *
+   * The conversions are as follow:
+   *
+   * Pig Groovy
+   * Tuple groovy.lang.tuple
+   * DataBag groovy.lang.Tuple containing the bag's size and an iterator on its
+   * content
+   * Map java.util.Map
+   * int/long/float/double as is
+   * chararray String
+   * bytearray byte[] (copy)
+   * boolean boolean
+   * null null
+   *
+   * anything else raises an exception
+   *
+   * @param pigObject
+   * @return
+   * @throws ExecException
+   */
+  public static Object pigToGroovy(Object pigObject) throws ExecException {
+
+    Object groovyObject = null;
+
+    if (pigObject instanceof Tuple) {
+      Object[] a = new Object[((Tuple) pigObject).size()];
+
+      int i = 0;
+      for (Object oo : ((Tuple) pigObject).getAll()) {
+        a[i++] = pigToGroovy(oo);
+      }
+
+      groovyObject = new groovy.lang.Tuple(a);
+    } else if (pigObject instanceof DataBag) {
+      //
+      // Return a Groovy Tuple containing the bag's size and an
+      // iterator on its content (Iterator will return instances of
+      // groovy.lang.Tuple)
+      //
+
+      Object[] size_iterator = new Object[2];
+      size_iterator[0] = ((DataBag) pigObject).size();
+      size_iterator[1] = new DataBagGroovyIterator(((DataBag) pigObject).iterator());
+      groovyObject = new groovy.lang.Tuple(size_iterator);
+    } else if (pigObject instanceof Map) {
+      Map<String, Object> m = new HashMap<String, Object>();
+
+      for (Map.Entry<String, ?> entry : ((Map<String, ?>) pigObject).entrySet()) {
+        m.put((String) pigToGroovy(entry.getKey()), pigToGroovy(entry.getValue()));
+      }
+
+      groovyObject = m;
+    } else if (pigObject instanceof Number || pigObject instanceof String || pigObject instanceof Boolean) {
+      groovyObject = pigObject;
+    } else if (pigObject instanceof DataByteArray) {
+      //
+      // Allocate a new byte array so we don't use the original array
+      //
+      byte[] b = new byte[((DataByteArray) pigObject).size()];
+
+      System.arraycopy(((DataByteArray) pigObject).get(), 0, b, 0, b.length);
+
+      groovyObject = b;
+    } else if (null == pigObject) {
+      groovyObject = null;
+    } else {
+      throw new ExecException("Unable to cast pig datatype " + pigObject.getClass().getName() + " to a suitable Groovy Object.");
+    }
+
+    return groovyObject;
+  }
+
+  public static class DataBagGroovyIterator implements Iterator<groovy.lang.Tuple> {
+
+    private final Iterator<Tuple> iter;
+
+    public DataBagGroovyIterator(Iterator<Tuple> iter) {
+      this.iter = iter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public groovy.lang.Tuple next() {
+      try {
+        return (groovy.lang.Tuple) pigToGroovy(iter.next());
+      } catch (ExecException ee) {
+        throw new RuntimeException(ee);
+      }
+    }
+
+    @Override
+    public void remove() {
+    }
+  }
+}

Added: pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface OutputSchemaFunction {
+  String value();
+}