You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 20:28:58 UTC
[1/2] beam git commit: take SerializableFunction as UDF.
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 3e25ffb04 -> be01be5cb
take SerializableFunction as UDF.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffbd4c2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffbd4c2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffbd4c2f
Branch: refs/heads/DSL_SQL
Commit: ffbd4c2f7ae0a608f061e718aefa23d3349e34a5
Parents: 3e25ffb
Author: mingmxu <mi...@ebay.com>
Authored: Tue Aug 8 23:02:32 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 13:27:20 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/extensions/sql/BeamSql.java | 17 +++++++++++++++++
.../apache/beam/sdk/extensions/sql/BeamSqlEnv.java | 9 +++++++++
.../interpreter/operator/BeamSqlUdfExpression.java | 4 +++-
.../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java | 13 ++++++++++++-
4 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index ac617ad..a1e9877 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -144,6 +145,14 @@ public class BeamSql {
getSqlEnv().registerUdf(functionName, clazz);
return this;
}
+ /**
+ * register {@link SerializableFunction} as a UDF function used in this query.
+ * Note, {@link SerializableFunction} must have a constructor without arguments.
+ */
+ public QueryTransform withUdf(String functionName, SerializableFunction sfn){
+ getSqlEnv().registerUdf(functionName, sfn);
+ return this;
+ }
/**
* register a UDAF function used in this query.
@@ -213,6 +222,14 @@ public class BeamSql {
getSqlEnv().registerUdf(functionName, clazz);
return this;
}
+ /**
+ * register {@link SerializableFunction} as a UDF function used in this query.
+ * Note, {@link SerializableFunction} must have a constructor without arguments.
+ */
+ public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
+ getSqlEnv().registerUdf(functionName, sfn);
+ return this;
+ }
/**
* register a UDAF function used in this query.
http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index 4d21425..0737c49 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.type.RelDataType;
@@ -60,6 +61,14 @@ public class BeamSqlEnv implements Serializable{
}
/**
+ * register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
+ * Note, {@link SerializableFunction} must have a constructor without arguments.
+ */
+ public void registerUdf(String functionName, SerializableFunction sfn) {
+ schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
+ }
+
+ /**
* Register a UDAF function which can be used in GROUP-BY expression.
* See {@link BeamSqlUdaf} on how to implement a UDAF.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index f1bcb66..123e6a0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
public class BeamSqlUdfExpression extends BeamSqlExpression {
//as Method is not Serializable, need to keep class/method information, and rebuild it.
private transient Method method;
+ private transient Object udfIns;
private String className;
private String methodName;
private List<String> paraClassName = new ArrayList<>();
@@ -63,7 +64,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
}
return BeamSqlPrimitive.of(getOutputType(),
- method.invoke(null, paras.toArray(new Object[]{})));
+ method.invoke(udfIns, paras.toArray(new Object[]{})));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -78,6 +79,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
for (String pc : paraClassName) {
paraClass.add(Class.forName(pc));
}
+ udfIns = Class.forName(className).newInstance();
method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 7302376..0552cbf 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -82,7 +83,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
PCollection<BeamRecord> result2 =
PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
.apply("testUdf2",
- BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
+ BeamSql.query(sql2).withUdf("cubic2", new CubicIntegerFn()));
PAssert.that(result2).containsInAnyOrder(record);
pipeline.run().waitUntilFinish();
@@ -131,4 +132,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
return input * input * input;
}
}
+
+ /**
+ * A example UDF with {@link SerializableFunction}.
+ */
+ public static class CubicIntegerFn implements SerializableFunction<Integer, Integer> {
+ @Override
+ public Integer apply(Integer input) {
+ return input * input * input;
+ }
+ }
}
[2/2] beam git commit: [BEAM-2748] This closes #3707
Posted by ta...@apache.org.
[BEAM-2748] This closes #3707
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be01be5c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be01be5c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be01be5c
Branch: refs/heads/DSL_SQL
Commit: be01be5cb5ebf494718c09d0518ac77fdc2b4efd
Parents: 3e25ffb ffbd4c2
Author: Tyler Akidau <ta...@apache.org>
Authored: Wed Aug 9 13:27:52 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 13:27:52 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/extensions/sql/BeamSql.java | 17 +++++++++++++++++
.../apache/beam/sdk/extensions/sql/BeamSqlEnv.java | 9 +++++++++
.../interpreter/operator/BeamSqlUdfExpression.java | 4 +++-
.../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java | 13 ++++++++++++-
4 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------