You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/17 15:14:24 UTC
[1/2] beam git commit: proposal for new UDF
Repository: beam
Updated Branches:
refs/heads/DSL_SQL a452b8020 -> bed209e41
proposal for new UDF
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49e4783
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49e4783
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49e4783
Branch: refs/heads/DSL_SQL
Commit: a49e47830e5689b9b392d23813626b1cf9636ca6
Parents: 5fea746
Author: mingmxu <mi...@ebay.com>
Authored: Thu Jul 13 23:22:14 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Thu Jul 13 23:22:14 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 9 +++--
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 5 ++-
.../apache/beam/dsls/sql/schema/BeamSqlUdf.java | 41 ++++++++++++++++++++
.../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 9 +++--
4 files changed, 54 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index ec3799c..d902f42 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -128,8 +129,8 @@ public class BeamSql {
/**
* register a UDF function used in this query.
*/
- public QueryTransform withUdf(String functionName, Class<?> clazz, String methodName){
- getSqlEnv().registerUdf(functionName, clazz, methodName);
+ public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
return this;
}
@@ -196,8 +197,8 @@ public class BeamSql {
/**
* register a UDF function used in this query.
*/
- public SimpleQueryTransform withUdf(String functionName, Class<?> clazz, String methodName){
- getSqlEnv().registerUdf(functionName, clazz, methodName);
+ public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index 61f0355..e8c8c97 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
@@ -55,8 +56,8 @@ public class BeamSqlEnv implements Serializable{
/**
* Register a UDF function which can be used in SQL expression.
*/
- public void registerUdf(String functionName, Class<?> clazz, String methodName) {
- schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
+ public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
new file mode 100644
index 0000000..2066353
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ * public String eval(
+ * @Parameter(name = "s") String s,
+ * @Parameter(name = "n", optional = true) Integer n) {
+ * return s.substring(0, n == null ? 1 : n);
+ * }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+ String UDF_METHOD = "eval";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
index ba3e87e..332a273 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -78,14 +79,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
PCollection<BeamSqlRow> result1 =
boundedInput1.apply("testUdf1",
- BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class, "cubic"));
+ BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
PAssert.that(result1).containsInAnyOrder(record);
String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
PCollection<BeamSqlRow> result2 =
PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
.apply("testUdf2",
- BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class, "cubic"));
+ BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
PAssert.that(result2).containsInAnyOrder(record);
pipeline.run().waitUntilFinish();
@@ -129,8 +130,8 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
/**
* A example UDF for test.
*/
- public static class CubicInteger{
- public static Integer cubic(Integer input){
+ public static class CubicInteger implements BeamSqlUdf{
+ public static Integer eval(Integer input){
return input * input * input;
}
}
[2/2] beam git commit: This closes #3561
Posted by jb...@apache.org.
This closes #3561
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bed209e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bed209e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bed209e4
Branch: refs/heads/DSL_SQL
Commit: bed209e41a3b8c7380aa3bee22833b5b5583a235
Parents: a452b80 a49e478
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jul 17 17:14:12 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jul 17 17:14:12 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 9 +++--
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 5 ++-
.../apache/beam/dsls/sql/schema/BeamSqlUdf.java | 41 ++++++++++++++++++++
.../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 9 +++--
4 files changed, 54 insertions(+), 10 deletions(-)
----------------------------------------------------------------------