You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/16 16:06:39 UTC
[2/3] beam git commit: support UDF
support UDF
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95cba796
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95cba796
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95cba796
Branch: refs/heads/DSL_SQL
Commit: 95cba7960552ebfbb594aef1edb775906aec8a79
Parents: 0e08c87
Author: mingmxu <mi...@ebay.com>
Authored: Sat May 13 21:42:05 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Mon May 15 20:40:37 2017 -0700
----------------------------------------------------------------------
.../dsls/sql/interpreter/BeamSQLFnExecutor.java | 13 ++-
.../operator/BeamSqlUdfExpression.java | 86 ++++++++++++++++++++
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 5 +-
.../beam/dsls/sql/planner/BeamSqlRunner.java | 12 +++
.../operator/BeamSqlUdfExpressionTest.java | 51 ++++++++++++
.../sql/planner/BeamGroupByExplainTest.java | 11 +++
.../sql/planner/BeamGroupByPipelineTest.java | 12 +++
7 files changed, 188 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index 4b7af2a..eb9fedf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -35,6 +35,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
@@ -60,7 +61,9 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
/**
* Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
@@ -181,7 +184,15 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
case "SESSION_END":
return new BeamSqlWindowEndExpression();
default:
- throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+ //handle UDF
+ if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+ SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+ ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+ return new BeamSqlUdfExpression(fn.method, subExps,
+ ((RexCall) rexNode).type.getSqlTypeName());
+ } else {
+ throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+ }
}
} else {
throw new BeamSqlUnsupportedException(
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..d6cf987
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * revoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+ //as Method is not Serializable, need to keep class/method information, and rebuild it.
+ private transient Method method;
+ private String className;
+ private String methodName;
+ private List<String> paraClassName = new ArrayList<>();
+
+ public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+ SqlTypeName sqlTypeName) {
+ super(subExps, sqlTypeName);
+ this.method = method;
+
+ this.className = method.getDeclaringClass().getName();
+ this.methodName = method.getName();
+ for (Class<?> c : method.getParameterTypes()) {
+ paraClassName.add(c.getName());
+ }
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ if (method == null) {
+ reConstructMethod();
+ }
+ try {
+ List<Object> paras = new ArrayList<>();
+ for (BeamSqlExpression e : getOperands()) {
+ paras.add(e.evaluate(inputRecord).getValue());
+ }
+
+ return BeamSqlPrimitive.of(getOutputType(),
+ method.invoke(null, paras.toArray(new Object[]{})));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * re-construct method from class/method.
+ */
+ private void reConstructMethod() {
+ try {
+ List<Class<?>> paraClass = new ArrayList<>();
+ for (String pc : paraClassName) {
+ paraClass.add(Class.forName(pc));
+ }
+ method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 29b3f1d..9e41555 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
@@ -83,7 +84,9 @@ public class BeamQueryPlanner {
FrameworkConfig config = Frameworks.newConfigBuilder()
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
- .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+ .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .build();
this.planner = Frameworks.getPlanner(config);
for (String t : schema.getTableNames()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
index 708c507..95ba5a9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
@@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelConversionException;
@@ -65,6 +66,17 @@ public class BeamSqlRunner implements Serializable {
}
/**
+ * Add a UDF function.
+ *
+ * <p>There're two requirements for function {@code methodName}:<br>
+ * 1. It must be a STATIC method;<br>
+ * 2. For a primitive parameter, use its wrapper class and handle NULL properly;
+ */
+ public void addUDFFunction(String functionName, Class<?> className, String methodName){
+ schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
+ }
+
+ /**
* submit as a Beam pipeline.
*
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
new file mode 100644
index 0000000..71ac523
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlUdfExpression.
+ */
+public class BeamSqlUdfExpressionTest extends BeamSQLFnExecutorTestBase {
+
+ @Test
+ public void testUdf() throws NoSuchMethodException, SecurityException {
+ List<BeamSqlExpression> operands = new ArrayList<>();
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+
+ BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
+ UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
+
+ Assert.assertEquals(-10, exp.evaluate(record).getValue());
+ }
+
+ /**
+ * UDF example.
+ */
+ public static final class UdfFn {
+ public static int negative(Integer number) {
+ return number == null ? 0 : 0 - number;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
index 566c574..4f2b1ba 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
import org.junit.Test;
/**
@@ -90,4 +91,14 @@ public class BeamGroupByExplainTest extends BasePlanner {
String plan = runner.explainQuery(sql);
}
+ /**
+ * Query with UDF.
+ */
+ @Test
+ public void testUdf() throws Exception {
+ runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+ String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
+
+ String plan = runner.explainQuery(sql);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index d5f8125..71dcf73 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
import org.apache.beam.sdk.Pipeline;
import org.junit.Test;
@@ -91,4 +92,15 @@ public class BeamGroupByPipelineTest extends BasePlanner {
Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
}
+ /**
+ * Query with UDF.
+ */
+ @Test
+ public void testUdf() throws Exception {
+ runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+ String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
+
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
}