You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/07/16 20:56:10 UTC
svn commit: r1362192 [1/2] - in /pig/trunk: ./ ivy/
src/org/apache/pig/scripting/ src/org/apache/pig/scripting/groovy/ test/
test/org/apache/pig/test/
Author: julien
Date: Mon Jul 16 18:56:10 2012
New Revision: 1362192
URL: http://svn.apache.org/viewvc?rev=1362192&view=rev
Log:
PIG-2763: Groovy UDFs (herberts via julien)
Added:
pig/trunk/src/org/apache/pig/scripting/groovy/
pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java
pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java
pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java
pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java
pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java
pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java
pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java
pig/trunk/test/org/apache/pig/test/TestUDFGroovy.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
pig/trunk/test/unit-tests
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 16 18:56:10 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2763: Groovy UDFs (herberts via julien)
+
PIG-2780: MapReduceLauncher should break early when one of the jobs throws an exception (jay23jack via daijy)
PIG-2804: Remove "PIG" exec type (dvryaboy)
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Jul 16 18:56:10 2012
@@ -184,6 +184,8 @@
conf="compile->master"/>
<dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}"
conf="releaseaudit->default"/>
+ <dependency org="org.codehaus.groovy" name="groovy-all" rev="${groovy.version}"
+ conf="compile->master"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"
conf="compile->master"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}"
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Jul 16 18:56:10 2012
@@ -31,6 +31,7 @@ jersey.version=1.8
checkstyle.version=4.2
ivy.version=2.2.0
jasper.version=6.1.14
+groovy.version=1.8.6
guava.version=11.0
jersey-core.version=1.8
hadoop-core.version=1.0.0
Modified: pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=1362192&r1=1362191&r2=1362192&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Jul 16 18:56:10 2012
@@ -43,14 +43,15 @@ import org.apache.pig.tools.pigstats.Pig
* Base class for various scripting implementations
*/
public abstract class ScriptEngine {
-
+
public static enum SupportedScriptLang {
// possibly jruby in the future
jruby(new String[]{"ruby", "jruby"}, new String[]{"rb"}, "org.apache.pig.scripting.jruby.JrubyScriptEngine"),
- jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
- javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine");
-
+ jython(new String[]{"python", "jython"}, new String[]{"py"}, "org.apache.pig.scripting.jython.JythonScriptEngine"),
+ javascript(new String[]{}, new String[]{"js"}, "org.apache.pig.scripting.js.JsScriptEngine"),
+ groovy(new String[]{}, new String[]{"groovy"}, "org.apache.pig.scripting.groovy.GroovyScriptEngine");
+
private static Set<String> supportedScriptLangs;
static {
supportedScriptLangs = new HashSet<String>();
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorAccumulate.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'accumulate' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST accept a single groovy.lang.Tuple as parameter.
+ * Its return value, if any, is ignored.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorAccumulate {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorCleanup.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'cleanup' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST NOT accept any parameters.
+ * Its return value, if any, is ignored.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorCleanup {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AccumulatorGetValue.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Accumulator UDF to indicate that it will
+ * act as the 'getValue' method of the UDF.
+ *
+ * The value of the annotation is the name of the Accumulator UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT be static and MUST NOT accept any parameters.
+ * Its return value will be that of the Accumulator UDF.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AccumulatorGetValue {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicFinal.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Final' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST NOT accept any parameters.
+ * Its return value will be that of the Algebraic UDF.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicFinal {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicInitial.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Initial' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST accept a single groovy.lang.Tuple parameter,
+ * and MUST return a groovy.lang.Tuple or Object[].
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicInitial {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/AlgebraicIntermed.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is to be used on a Groovy method which
+ * is part of an Algebraic UDF to indicate that it will
+ * act as the exec method of the 'Intermed' class of the UDF.
+ *
+ * The value of the annotation is the name of the Algebraic UDF
+ * it belongs to.
+ *
+ * The annotated method MUST accept a single groovy.lang.Tuple parameter,
+ * and MUST return a groovy.lang.Tuple or Object[].
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AlgebraicIntermed {
+ String value();
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAccumulatorEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class GroovyAccumulatorEvalFunc extends AccumulatorEvalFunc<Object> {
+
+ private GroovyEvalFuncObject groovyAccumulate;
+ private GroovyEvalFuncObject groovyGetValue;
+ private GroovyEvalFuncObject groovyCleanup;
+
+ public GroovyAccumulatorEvalFunc(String path, String namespace, String accumulatorMethod, String accumulateMethod,
+ String getValueMethod, String cleanupMethod) throws IOException {
+ //
+ // Use the same invocation target for accumulate/getValue/cleanup
+ //
+ this.groovyAccumulate = new GroovyEvalFuncObject(path, namespace, accumulateMethod);
+ this.groovyGetValue = new GroovyEvalFuncObject(path, namespace, getValueMethod, this.groovyAccumulate.getInvocationTarget());
+ this.groovyCleanup = new GroovyEvalFuncObject(path, namespace, cleanupMethod, this.groovyAccumulate.getInvocationTarget());
+ }
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ this.groovyAccumulate.exec(b);
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ this.groovyCleanup.exec(null);
+ } catch (IOException ioe) {
+ }
+ }
+
+ @Override
+ public Object getValue() {
+ try {
+ return this.groovyGetValue.exec(null);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return this.groovyGetValue.outputSchema(input);
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyAlgebraicEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+public abstract class GroovyAlgebraicEvalFunc<T> extends AlgebraicEvalFunc<T> {
+
+ public GroovyAlgebraicEvalFunc() {
+ }
+
+ public GroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public abstract String getFinal();
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermed.class.getName();
+ }
+
+ public static abstract class AlgebraicFunctionWrapper<T> extends GroovyEvalFunc<T> {
+ public AlgebraicFunctionWrapper() {
+ }
+
+ public AlgebraicFunctionWrapper(String path, String namespace, String methodName) throws IOException {
+ super(path, namespace, methodName);
+ }
+ }
+
+ public static class Initial extends AlgebraicFunctionWrapper<Tuple> {
+ public Initial() {
+ }
+
+ public Initial(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, initialMethod);
+ }
+ }
+
+ public static class Intermed extends AlgebraicFunctionWrapper<Tuple> {
+ public Intermed() {
+ }
+
+ public Intermed(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, intermedMethod);
+ }
+ }
+
+ public static class Final<T> extends AlgebraicFunctionWrapper<T> {
+ public Final() {
+ }
+
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, finalMethod);
+ }
+ }
+
+ /**
+ * Unlike EvalFuncs and Accumulators, the type must be known at compile time
+ * (ie it
+ * can't return Object), as Pig inspects the type and ensures that it is
+ * valid. This
+ * is why class specific shells are provided here.
+ */
+ public static class DataBagGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<DataBag> {
+ public DataBagGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<DataBag> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class TupleGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Tuple> {
+ public TupleGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Tuple> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class ChararrayGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<String> {
+ public ChararrayGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<String> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class DataByteArrayGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<DataByteArray> {
+ public DataByteArrayGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<DataByteArray> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class DoubleGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Double> {
+ public DoubleGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Double> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class FloatGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Float> {
+ public FloatGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Float> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class IntegerGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Integer> {
+ public IntegerGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Integer> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class LongGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Long> {
+ public LongGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Long> {
+ public Final() {
+ }
+
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class MapGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Map<String, ?>> {
+ public MapGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Map<String, ?>> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+
+ public static class BooleanGroovyAlgebraicEvalFunc extends GroovyAlgebraicEvalFunc<Boolean> {
+ public BooleanGroovyAlgebraicEvalFunc(String path, String namespace, String methodName, String initialMethod,
+ String intermedMethod, String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ public static class Final extends GroovyAlgebraicEvalFunc.Final<Boolean> {
+ public Final(String path, String namespace, String methodName, String initialMethod, String intermedMethod,
+ String finalMethod) throws IOException {
+ super(path, namespace, methodName, initialMethod, intermedMethod, finalMethod);
+ }
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import groovy.util.ResourceException;
+import groovy.util.ScriptException;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.OutputSchema;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.scripting.ScriptEngine;
+
+public class GroovyEvalFunc<T> extends EvalFunc<T> {
+
+ private Schema schema = null;
+
+ private GroovyEvalFunc schemaFunction = null;
+
+ protected Method method = null;
+
+ private static Map<String, Class> scriptClasses = new ConcurrentHashMap<String, Class>();
+
+ private Object invocationTarget;
+
+ public GroovyEvalFunc() {
+ }
+
+ public GroovyEvalFunc(String path, String namespace, String methodName) throws IOException {
+ this(path, namespace, methodName, null);
+ }
+
+ public GroovyEvalFunc(String path, String namespace, String methodName, Object target) throws IOException {
+ String fqmn = "".equals(namespace) ? methodName : namespace + ScriptEngine.NAMESPACE_SEPARATOR + methodName;
+
+ Class c = scriptClasses.get(path);
+
+ if (null == c) {
+ try {
+ c = GroovyScriptEngine.getEngine().loadScriptByName(path);
+ } catch (ScriptException se) {
+ throw new IOException(se);
+ } catch (ResourceException re) {
+ throw new IOException(re);
+ }
+ }
+
+ scriptClasses.put(path, c);
+
+ Method[] methods = c.getMethods();
+
+ int matches = 0;
+
+ for (Method m : methods) {
+ if (m.getName().equals(methodName)) {
+ this.method = m;
+ matches++;
+ }
+ }
+
+ if (null == this.method) {
+ throw new IOException("Method " + methodName + " was not found in '" + path + "'");
+ }
+
+ if (matches > 1) {
+ throw new IOException("There are " + matches + " methods with name '" + methodName + "', please make sure method names are unique within the Groovy class.");
+ }
+
+ //
+ // Extract schema
+ //
+
+ Annotation[] annotations = this.method.getAnnotations();
+
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(OutputSchemaFunction.class)) {
+ this.schemaFunction = new GroovyEvalFuncObject(path, namespace, ((OutputSchemaFunction) annotation).value());
+ break;
+ } else if (annotation.annotationType().equals(OutputSchema.class)) {
+ this.schema = Utils.getSchemaFromString(((OutputSchema) annotation).value());
+ break;
+ }
+ }
+
+ //
+ // For static method, invocation target is null, for non
+ // static method, create/set invocation target unless passed
+ // to the constructor
+ //
+
+ if (!Modifier.isStatic(this.method.getModifiers())) {
+ if (null != target) {
+ this.invocationTarget = target;
+ } else {
+ try {
+ this.invocationTarget = c.newInstance();
+ } catch (InstantiationException ie) {
+ throw new IOException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IOException(iae);
+ }
+ }
+ }
+ }
+
+ @Override
+ public T exec(Tuple input) throws IOException {
+
+ Object[] args = new Object[null != input ? input.size() : 0];
+
+ for (int i = 0; i < args.length; i++) {
+ args[i] = GroovyUtils.pigToGroovy(input.get(i));
+ }
+
+ try {
+ if (this.method.getReturnType().equals(Void.TYPE)) {
+ //
+ // Invoke method but return null if method is 'void',
+ // this is done so we can wrap 'accumulate' and 'cleanup' methods too.
+ //
+ this.method.invoke(this.invocationTarget, args);
+ return null;
+ } else {
+ return (T) GroovyUtils.groovyToPig(this.method.invoke(this.invocationTarget, args));
+ }
+ } catch (InvocationTargetException ite) {
+ throw new IOException(ite);
+ } catch (IllegalAccessException iae) {
+ throw new IOException(iae);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ if (null != this.schemaFunction) {
+ try {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ // Strip enclosing '{}' from schema
+ t.set(0, input.toString().replaceAll("^\\{", "").replaceAll("\\}$", ""));
+ return Utils.getSchemaFromString((String) this.schemaFunction.exec(t));
+ } catch (ParserException pe) {
+ throw new RuntimeException(pe);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ } else {
+ return this.schema;
+ }
+ }
+
+ public Object getInvocationTarget() {
+ return this.invocationTarget;
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFuncObject.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.io.IOException;
+
+public class GroovyEvalFuncObject extends GroovyEvalFunc<Object> {
+ public GroovyEvalFuncObject() {
+ super();
+ }
+
+ public GroovyEvalFuncObject(String path, String namespace, String method) throws IOException {
+ super(path, namespace, method);
+ }
+
+ public GroovyEvalFuncObject(String path, String namespace, String method, Object target) throws IOException {
+ super(path, namespace, method, target);
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyScriptEngine.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import groovy.lang.Tuple;
+import groovy.util.ResourceException;
+import groovy.util.ScriptException;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.OutputSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.BooleanGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.ChararrayGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DataBagGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DataByteArrayGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.DoubleGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.FloatGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.IntegerGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.LongGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.MapGroovyAlgebraicEvalFunc;
+import org.apache.pig.scripting.groovy.GroovyAlgebraicEvalFunc.TupleGroovyAlgebraicEvalFunc;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class GroovyScriptEngine extends ScriptEngine {
+
+ private static final Log LOG = LogFactory.getLog(GroovyScriptEngine.class);
+
+ private static groovy.util.GroovyScriptEngine gse;
+
+ private static boolean isInitialized = false;
+
+ static {
+ try {
+ gse = new groovy.util.GroovyScriptEngine("");
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ protected Map<String, List<PigStats>> main(PigContext context, String scriptFile) throws IOException {
+
+ PigServer pigServer = new PigServer(context, false);
+
+ //
+ // Register dependencies
+ //
+
+ String groovyJar = getJarPath(groovy.util.GroovyScriptEngine.class);
+
+ if (null != groovyJar) {
+ pigServer.registerJar(groovyJar);
+ }
+
+ //
+ // Register UDFs
+ //
+
+ registerFunctions(scriptFile, null, context);
+
+ try {
+
+ //
+ // Load the script
+ //
+
+ Class c = gse.loadScriptByName(scriptFile);
+
+ //
+ // Extract the main method
+ //
+
+ Method main = c.getMethod("main", String[].class);
+
+ if (null == main || !Modifier.isStatic(main.getModifiers()) || !Modifier.isPublic(main.getModifiers()) || !Void.TYPE.equals(main.getReturnType())) {
+ throw new IOException("No method 'public static void main(String[] args)' was found.");
+ }
+
+ //
+ // Invoke the main method
+ //
+
+ Object[] args = new Object[1];
+ String[] argv = (String[])ObjectSerializer.deserialize(context.getProperties().getProperty(PigContext.PIG_CMD_ARGS_REMAINDERS));
+ args[0] = argv;
+
+ main.invoke(null, args);
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ return getPigStatsMap();
+ }
+
+ @Override
+ public void registerFunctions(String path, String namespace, PigContext pigContext) throws IOException {
+
+ if (!isInitialized) {
+ pigContext.scriptJars.add(getJarPath(groovy.util.GroovyScriptEngine.class));
+ isInitialized = true;
+ }
+
+ try {
+ //
+ // Read file
+ //
+
+ Class c = gse.loadScriptByName(path);
+
+ //
+ // Keep track of initial/intermed/final methods of Albegraic UDFs
+ //
+
+ Map<String, Method[]> algebraicMethods = new HashMap<String, Method[]>();
+
+ //
+ // Keep track of accumulate/getValue/cleanup methods of Accumulator UDFs
+ //
+
+ Map<String, Method[]> accumulatorMethods = new HashMap<String, Method[]>();
+
+ //
+ // Loop over the methods
+ //
+
+ Method[] methods = c.getMethods();
+
+ for (Method method : methods) {
+ Annotation[] annotations = method.getAnnotations();
+
+ boolean isAccumulator = false;
+
+ if (annotations.length > 0) {
+ Schema schema = null;
+ String schemaFunction = null;
+
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(OutputSchema.class)) {
+ schema = Utils.getSchemaFromString(((OutputSchema) annotation).value());
+ } else if (annotation.annotationType().equals(OutputSchemaFunction.class)) {
+ schemaFunction = ((OutputSchemaFunction) annotation).value();
+ } else if (isAlgebraic(annotation)) {
+
+ String algebraic = null;
+
+ int idx = 0;
+
+ if (annotation.annotationType().equals(AlgebraicInitial.class)) {
+ //
+ // Check that method accepts a single Tuple as parameter.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (1 != params.length || !Tuple.class.equals(params[0])) {
+ throw new IOException(path + ": methods annotated with @AlgebraicInitial MUST take a single groovy.lang.Tuple as parameter.");
+ }
+ if (!method.getReturnType().equals(Tuple.class) && !method.getReturnType().equals(Object[].class)) {
+ throw new IOException(path + ":" + method.getName()
+ + " Algebraic UDF Initial method MUST return type groovy.lang.Tuple or Object[].");
+ }
+ algebraic = ((AlgebraicInitial) annotation).value();
+ idx = 0;
+ } else if (annotation.annotationType().equals(AlgebraicIntermed.class)) {
+ //
+ // Check that method accepts a single Tuple as parameter.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (1 != params.length || !Tuple.class.equals(params[0])) {
+ throw new IOException(path + ": methods annotated with @AlgebraicIntermed MUST take a single groovy.lang.Tuple as parameter.");
+ }
+ if (!method.getReturnType().equals(Tuple.class) && !method.getReturnType().equals(Object[].class)) {
+ throw new IOException(path + ":" + method.getName()
+ + " Algebraic UDF Intermed method MUST return type groovy.lang.Tuple or Object[].");
+ }
+ algebraic = ((AlgebraicIntermed) annotation).value();
+ idx = 1;
+ } else {
+ //
+ // Check that method accepts a single Tuple as parameter.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (1 != params.length || !Tuple.class.equals(params[0])) {
+ throw new IOException(path + ": methods annotated with @AlgebraicFinal MUST take a single groovy.lang.Tuple as parameter.");
+ }
+ algebraic = ((AlgebraicFinal) annotation).value();
+ idx = 2;
+ }
+
+ Method[] algmethods = algebraicMethods.get(algebraic);
+
+ if (null == algmethods) {
+ algmethods = new Method[3];
+ algebraicMethods.put(algebraic, algmethods);
+ }
+
+ if (null != algmethods[idx]) {
+ throw new IOException(path + ": Algebraic UDF '" + algebraic + "' already has an "
+ + annotation.annotationType().getSimpleName() + " method defined ('" + algmethods[idx] + "')");
+ }
+
+ algmethods[idx] = method;
+ } else if (isAccumulator(annotation)) {
+ String accumulator = null;
+
+ int idx = 0;
+
+ if (annotation.annotationType().equals(AccumulatorAccumulate.class)) {
+ //
+ // Check that method accepts a single Tuple as parameter.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (1 != params.length || !Tuple.class.equals(params[0])) {
+ throw new IOException(path + ": methods annotated with @AccumulatorAccumulate MUST take a single groovy.lang.Tuple as parameter.");
+ }
+ accumulator = ((AccumulatorAccumulate) annotation).value();
+ idx = 0;
+ } else if (annotation.annotationType().equals(AccumulatorGetValue.class)) {
+ //
+ // Check that method does not accept any parameters.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (0 != params.length) {
+ throw new IOException(path + ": methods annotated with @AccumulatorGetValue take no parameters.");
+ }
+ accumulator = ((AccumulatorGetValue) annotation).value();
+ isAccumulator = true;
+ idx = 1;
+ } else if (annotation.annotationType().equals(AccumulatorCleanup.class)) {
+ //
+ // Check that method does not accept any parameters.
+ //
+ Class<?>[] params = method.getParameterTypes();
+ if (0 != params.length) {
+ throw new IOException(path + ": methods annotated with @AccumulatorCleanup take no parameters and return void.");
+ }
+ accumulator = ((AccumulatorCleanup) annotation).value();
+ idx = 2;
+ }
+
+ Method[] accumethods = accumulatorMethods.get(accumulator);
+
+ if (null == accumethods) {
+ accumethods = new Method[3];
+ accumulatorMethods.put(accumulator, accumethods);
+ }
+
+ if (null != accumethods[idx]) {
+ throw new IOException(path + ": Accumulator UDF '" + accumulator + "' already has an "
+ + annotation.annotationType().getSimpleName() + " method defined ('" + accumethods[idx] + "')");
+ }
+
+ accumethods[idx] = method;
+ }
+ }
+
+ //
+ // Only register functions which have an output schema declared
+ //
+
+ if (null == schema && null == schemaFunction) {
+ LOG.info(path
+ + ": Only methods annotated with @OutputSchema or @OutputSchemaFunction (but not with @AccumulatorGetValue are exposed to Pig, skipping method '"
+ + method.getName() + "'");
+ continue;
+ }
+
+ //
+ // Only one of OutputSchema / OutputSchemaFunction can be defined
+ //
+
+ if (null != schema && null != schemaFunction) {
+ LOG.info("Annotation @OutputSchemaFunction has precedence over @OutputSchema for method '" + method.getName() + "'");
+ }
+
+ //
+ // Register methods annotated with 'OutputSchema' or
+ // 'OutputSchemaFunction, unless they are accumulators' getValue
+ // methods
+ //
+
+ if (!isAccumulator) {
+ namespace = (namespace == null) ? "" : namespace;
+ FuncSpec spec = new FuncSpec(GroovyEvalFuncObject.class.getCanonicalName() + "('" + path + "','" + namespace
+ + "','" + method.getName() + "')");
+ pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + method.getName(),
+ spec);
+ LOG.info(path + ": Register Groovy UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+ + method.getName());
+ }
+ }
+ }
+
+ //
+ // Now register algebraic methods
+ //
+
+ for (String algebraic : algebraicMethods.keySet()) {
+
+ Method[] algmethods = algebraicMethods.get(algebraic);
+
+ if (null == algmethods[0]) {
+ throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have an Initial method defined.");
+ } else if (null == algmethods[1]) {
+ throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have an Intermed method defined.");
+ } else if (null == algmethods[2]) {
+ throw new IOException(path + ": Algebratic UDF '" + algebraic + "' does not have a Final method defined.");
+ }
+
+ //
+ // Retrieve schema of 'Final' method
+ //
+
+ String className = null;
+
+ Class<?> returnType = algmethods[2].getReturnType();
+
+ if (returnType.equals(Tuple.class) || returnType.equals(Object[].class)
+ || returnType.equals(org.apache.pig.data.Tuple.class)) {
+ className = TupleGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(List.class) || returnType.equals(DataBag.class)) {
+ className = DataBagGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Boolean.class) || returnType.equals(boolean.class)) {
+ className = BooleanGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(byte[].class) || returnType.equals(DataByteArray.class)) {
+ className = DataByteArrayGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(String.class)) {
+ className = ChararrayGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Double.class) || returnType.equals(double.class) || returnType.equals(BigDecimal.class)) {
+ className = DoubleGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Float.class) || returnType.equals(float.class)) {
+ className = FloatGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Byte.class) || returnType.equals(byte.class) || returnType.equals(Short.class)
+ || returnType.equals(short.class) || returnType.equals(Integer.class) || returnType.equals(int.class)) {
+ className = IntegerGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Long.class) || returnType.equals(long.class) || returnType.equals(BigInteger.class)) {
+ className = LongGroovyAlgebraicEvalFunc.class.getName();
+ } else if (returnType.equals(Map.class)) {
+ className = MapGroovyAlgebraicEvalFunc.class.getName();
+ } else {
+ throw new RuntimeException(path + ": Unknown return type for Algebraic UDF '" + algebraic + "'");
+ }
+
+ FuncSpec spec = new FuncSpec(className + "('" + path + "','" + namespace + "','" + algebraic + "','"
+ + algmethods[0].getName() + "','" + algmethods[1].getName() + "','" + algmethods[2].getName() + "')");
+ pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + algebraic, spec);
+
+ LOG.info("Register Groovy Algebraic UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+ + algebraic);
+ }
+
+ //
+ // Now register Accumulator UDFs
+ //
+
+ for (String accumulator : accumulatorMethods.keySet()) {
+
+ Method[] accumethods = accumulatorMethods.get(accumulator);
+
+ if (null == accumethods[0]) {
+ throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have an Accumulate method defined.");
+ } else if (null == accumethods[1]) {
+ throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have a GetValue method defined.");
+ } else if (null == accumethods[2]) {
+ throw new IOException(path + ": Accumulator UDF '" + accumulator + "' does not have a Cleanup method defined.");
+ }
+
+ FuncSpec spec = new FuncSpec(GroovyAccumulatorEvalFunc.class.getName() + "('" + path + "','" + namespace + "','"
+ + accumulator + "','" + accumethods[0].getName() + "','" + accumethods[1].getName() + "','"
+ + accumethods[2].getName() + "')");
+ pigContext.registerFunction(("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR)) + accumulator, spec);
+
+ LOG.info("Register Groovy Accumulator UDF: " + ("".equals(namespace) ? "" : (namespace + NAMESPACE_SEPARATOR))
+ + accumulator);
+ }
+ } catch (ResourceException re) {
+ throw new IOException(re);
+ } catch (ScriptException se) {
+ throw new IOException(se);
+ }
+
+ }
+
+ @Override
+ protected Map<String, Object> getParamsFromVariables() throws IOException {
+ return null;
+ }
+
+ @Override
+ protected String getScriptingLang() {
+ return "groovy";
+ }
+
+ protected static groovy.util.GroovyScriptEngine getEngine() {
+ return gse;
+ }
+
+ private static boolean isAlgebraic(Annotation annotation) {
+ return annotation.annotationType().equals(AlgebraicInitial.class)
+ || annotation.annotationType().equals(AlgebraicIntermed.class)
+ || annotation.annotationType().equals(AlgebraicFinal.class);
+ }
+
+ private static boolean isAccumulator(Annotation annotation) {
+ return annotation.annotationType().equals(AccumulatorAccumulate.class)
+ || annotation.annotationType().equals(AccumulatorGetValue.class)
+ || annotation.annotationType().equals(AccumulatorCleanup.class);
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyUtils.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class GroovyUtils {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ /**
+ * Converts an object created on the Groovy side to its Pig counterpart.
+ *
+ * The conversions are as follow:
+ *
+ * Groovy Pig
+ * Object[] Tuple
+ * groovy.lang.Tuple Tuple
+ * org.apache.pig.data.Tuple Tuple
+ * org.apache.pig.data.DataBag DataBag
+ * java.util.Map Map
+ * java.util.List DataBag
+ * Byte/Short/Integer int
+ * Long/BigInteger long
+ * Float float
+ * Double/BigDecimal double
+ * String chararray
+ * byte[] DataByteArray (copy)
+ * Boolean boolean
+ * null null
+ *
+ * anything else raises an exception
+ *
+ * @param groovyObject
+ * Groovy object to convert
+ * @return the Pig counterpart of groovyObject
+ *
+ * @throws ExecException
+ */
+ public static Object groovyToPig(Object groovyObject) throws ExecException {
+ Object pigObject = null;
+
+ if (groovyObject instanceof Object[] || groovyObject instanceof groovy.lang.Tuple) {
+ //
+ // Allocate a List<Object> that will be filled with converted
+ // objects and later passed to newTuple.
+ //
+
+ List<Object> pigObjects = new ArrayList<Object>();
+
+ //
+ // Convert each member of groovyObject
+ //
+
+ if (groovyObject instanceof Object[]) {
+ for (Object o : (Object[]) groovyObject) {
+ pigObjects.add(groovyToPig(o));
+ }
+ } else {
+ for (Object o : (Iterable) groovyObject) {
+ pigObjects.add(groovyToPig(o));
+ }
+ }
+
+ //
+ // Create the result Tuple
+ //
+
+ pigObject = tupleFactory.newTuple(pigObjects);
+ } else if (groovyObject instanceof Tuple || groovyObject instanceof DataBag) {
+ //
+ // Copy Pig Tuple/DataBag as is
+ // This enables the creation of instances of DataBag which do not fit in
+ // memory
+ //
+ // It is advised to wrap objects into a call to groovyToPig
+ // prior to adding them to a Tuple or DataBag
+ //
+
+ pigObject = groovyObject;
+ } else if (groovyObject instanceof Map) {
+ //
+ // Allocate a Map
+ //
+
+ Map<String, Object> pigMap = new HashMap<String, Object>();
+
+ //
+ // Iterate over Groovy Map, putting each entry into pigMap
+ //
+
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) groovyObject).entrySet()) {
+ pigMap.put(groovyToPig(entry.getKey()).toString(), groovyToPig(entry.getValue()));
+ }
+
+ pigObject = pigMap;
+ } else if (groovyObject instanceof List) {
+ //
+ // Allocate a DataBag
+ //
+
+ DataBag bag = bagFactory.newDefaultBag();
+
+ //
+ // Pig's bags can only contain tuples, so we cast the return value
+ // of groovyToPig to a Tuple, if it's not a tuple, a ClassCastException
+ // will
+ // be thrown.
+ //
+
+ for (Object o : (List) groovyObject) {
+ Object p = groovyToPig(o);
+
+ if (p instanceof Tuple || null == p) {
+ bag.add((Tuple) p);
+ } else {
+ // Wrap value in a Tuple if it's not already a tuple
+ bag.add(tupleFactory.newTuple(p));
+ }
+ }
+
+ pigObject = bag;
+ } else if (groovyObject instanceof Integer || groovyObject instanceof Long || groovyObject instanceof Float
+ || groovyObject instanceof Double) {
+ //
+ // Numeric types which have an equivalent in Pig are passed as is as they
+ // are immutable
+ //
+ pigObject = groovyObject;
+ } else if (groovyObject instanceof Byte || groovyObject instanceof Short) {
+ pigObject = ((Number) groovyObject).intValue();
+ } else if (groovyObject instanceof BigInteger) {
+ pigObject = ((Number) groovyObject).longValue();
+ } else if (groovyObject instanceof BigDecimal) {
+ pigObject = ((Number) groovyObject).doubleValue();
+ } else if (groovyObject instanceof byte[]) {
+ //
+ // Clone the byte array
+ //
+
+ byte[] b = new byte[((byte[]) groovyObject).length];
+ System.arraycopy((byte[]) groovyObject, 0, b, 0, b.length);
+
+ pigObject = new DataByteArray(b);
+ } else if (groovyObject instanceof String) {
+ //
+ // String is immutable, so pass it as is
+ //
+
+ pigObject = groovyObject;
+ } else if (groovyObject instanceof Boolean) {
+ pigObject = groovyObject;
+ } else if (null == groovyObject) {
+ pigObject = null;
+ } else {
+ throw new ExecException("Unable to cast " + groovyObject.getClass().getName() + " to a Pig datatype.");
+ }
+
+ return pigObject;
+ }
+
+ /**
+ * Converts an object created on the Pig side to its Groovy counterpart.
+ *
+ * The conversions are as follow:
+ *
+ * Pig Groovy
+ * Tuple groovy.lang.tuple
+ * DataBag groovy.lang.Tuple containing the bag's size and an iterator on its
+ * content
+ * Map java.util.Map
+ * int/long/float/double as is
+ * chararray String
+ * bytearray byte[] (copy)
+ * boolean boolean
+ * null null
+ *
+ * anything else raises an exception
+ *
+ * @param pigObject
+ * @return
+ * @throws ExecException
+ */
+ public static Object pigToGroovy(Object pigObject) throws ExecException {
+
+ Object groovyObject = null;
+
+ if (pigObject instanceof Tuple) {
+ Object[] a = new Object[((Tuple) pigObject).size()];
+
+ int i = 0;
+ for (Object oo : ((Tuple) pigObject).getAll()) {
+ a[i++] = pigToGroovy(oo);
+ }
+
+ groovyObject = new groovy.lang.Tuple(a);
+ } else if (pigObject instanceof DataBag) {
+ //
+ // Return a Groovy Tuple containing the bag's size and an
+ // iterator on its content (Iterator will return instances of
+ // groovy.lang.Tuple)
+ //
+
+ Object[] size_iterator = new Object[2];
+ size_iterator[0] = ((DataBag) pigObject).size();
+ size_iterator[1] = new DataBagGroovyIterator(((DataBag) pigObject).iterator());
+ groovyObject = new groovy.lang.Tuple(size_iterator);
+ } else if (pigObject instanceof Map) {
+ Map<String, Object> m = new HashMap<String, Object>();
+
+ for (Map.Entry<String, ?> entry : ((Map<String, ?>) pigObject).entrySet()) {
+ m.put((String) pigToGroovy(entry.getKey()), pigToGroovy(entry.getValue()));
+ }
+
+ groovyObject = m;
+ } else if (pigObject instanceof Number || pigObject instanceof String || pigObject instanceof Boolean) {
+ groovyObject = pigObject;
+ } else if (pigObject instanceof DataByteArray) {
+ //
+ // Allocate a new byte array so we don't use the original array
+ //
+ byte[] b = new byte[((DataByteArray) pigObject).size()];
+
+ System.arraycopy(((DataByteArray) pigObject).get(), 0, b, 0, b.length);
+
+ groovyObject = b;
+ } else if (null == pigObject) {
+ groovyObject = null;
+ } else {
+ throw new ExecException("Unable to cast pig datatype " + pigObject.getClass().getName() + " to a suitable Groovy Object.");
+ }
+
+ return groovyObject;
+ }
+
+ public static class DataBagGroovyIterator implements Iterator<groovy.lang.Tuple> {
+
+ private final Iterator<Tuple> iter;
+
+ public DataBagGroovyIterator(Iterator<Tuple> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public groovy.lang.Tuple next() {
+ try {
+ return (groovy.lang.Tuple) pigToGroovy(iter.next());
+ } catch (ExecException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public void remove() {
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java?rev=1362192&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java (added)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/OutputSchemaFunction.java Mon Jul 16 18:56:10 2012
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.groovy;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface OutputSchemaFunction {
+ String value();
+}