You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/16 16:06:39 UTC

[2/3] beam git commit: support UDF

support UDF


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95cba796
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95cba796
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95cba796

Branch: refs/heads/DSL_SQL
Commit: 95cba7960552ebfbb594aef1edb775906aec8a79
Parents: 0e08c87
Author: mingmxu <mi...@ebay.com>
Authored: Sat May 13 21:42:05 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Mon May 15 20:40:37 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 13 ++-
 .../operator/BeamSqlUdfExpression.java          | 86 ++++++++++++++++++++
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  5 +-
 .../beam/dsls/sql/planner/BeamSqlRunner.java    | 12 +++
 .../operator/BeamSqlUdfExpressionTest.java      | 51 ++++++++++++
 .../sql/planner/BeamGroupByExplainTest.java     | 11 +++
 .../sql/planner/BeamGroupByPipelineTest.java    | 12 +++
 7 files changed, 188 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index 4b7af2a..eb9fedf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -35,6 +35,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
@@ -60,7 +61,9 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 
 /**
  * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
@@ -181,7 +184,15 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
       case "SESSION_END":
         return new BeamSqlWindowEndExpression();
       default:
-        throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+        //handle UDF
+        if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+          SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+          ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+          return new BeamSqlUdfExpression(fn.method, subExps,
+              ((RexCall) rexNode).type.getSqlTypeName());
+        } else {
+          throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+        }
       }
     } else {
       throw new BeamSqlUnsupportedException(

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..d6cf987
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * revoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+  //as Method is not Serializable, need to keep class/method information, and rebuild it.
+  private transient Method method;
+  private String className;
+  private String methodName;
+  private List<String> paraClassName = new ArrayList<>();
+
+  public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+      SqlTypeName sqlTypeName) {
+    super(subExps, sqlTypeName);
+    this.method = method;
+
+    this.className = method.getDeclaringClass().getName();
+    this.methodName = method.getName();
+    for (Class<?> c : method.getParameterTypes()) {
+      paraClassName.add(c.getName());
+    }
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    if (method == null) {
+      reConstructMethod();
+    }
+    try {
+      List<Object> paras = new ArrayList<>();
+      for (BeamSqlExpression e : getOperands()) {
+        paras.add(e.evaluate(inputRecord).getValue());
+      }
+
+      return BeamSqlPrimitive.of(getOutputType(),
+          method.invoke(null, paras.toArray(new Object[]{})));
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * re-construct method from class/method.
+   */
+  private void reConstructMethod() {
+    try {
+      List<Class<?>> paraClass = new ArrayList<>();
+      for (String pc : paraClassName) {
+        paraClass.add(Class.forName(pc));
+      }
+      method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 29b3f1d..9e41555 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
@@ -83,7 +84,9 @@ public class BeamQueryPlanner {
     FrameworkConfig config = Frameworks.newConfigBuilder()
         .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
         .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+        .build();
     this.planner = Frameworks.getPlanner(config);
 
     for (String t : schema.getTableNames()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
index 708c507..95ba5a9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
@@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.RelConversionException;
@@ -65,6 +66,17 @@ public class BeamSqlRunner implements Serializable {
   }
 
   /**
+   * Add a UDF function.
+   *
+   * <p>There're two requirements for function {@code methodName}:<br>
+   * 1. It must be a STATIC method;<br>
+   * 2. For a primitive parameter, use its wrapper class and handle NULL properly;
+   */
+  public void addUDFFunction(String functionName, Class<?> className, String methodName){
+    schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
+  }
+
+  /**
    * submit as a Beam pipeline.
    *
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
new file mode 100644
index 0000000..71ac523
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlUdfExpression.
+ */
+public class BeamSqlUdfExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testUdf() throws NoSuchMethodException, SecurityException {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
+
+    BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
+        UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
+
+    Assert.assertEquals(-10, exp.evaluate(record).getValue());
+  }
+
+  /**
+   * UDF example.
+   */
+  public static final class UdfFn {
+    public static int negative(Integer number) {
+      return number == null ? 0 : 0 - number;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
index 566c574..4f2b1ba 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
 import org.junit.Test;
 
 /**
@@ -90,4 +91,14 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String plan = runner.explainQuery(sql);
   }
 
+  /**
+   * Query with UDF.
+   */
+  @Test
+  public void testUdf() throws Exception {
+    runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+    String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
+
+    String plan = runner.explainQuery(sql);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index d5f8125..71dcf73 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
 import org.apache.beam.sdk.Pipeline;
 import org.junit.Test;
 
@@ -91,4 +92,15 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
   }
 
+  /**
+   * Query with UDF.
+   */
+  @Test
+  public void testUdf() throws Exception {
+    runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+    String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
+
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
 }