You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/07/11 17:26:38 UTC
[samza] branch master updated: SAMZA-2230: Support for ANY type
arguments in udfs that expect strongly typed arguments (#1060)
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5071629 SAMZA-2230: Support for ANY type arguments in udfs that expect strongly typed arguments (#1060)
5071629 is described below
commit 5071629cdb432143363223259b72c2dde1278271
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Thu Jul 11 10:26:33 2019 -0700
SAMZA-2230: Support for ANY type arguments in udfs that expect strongly typed arguments (#1060)
* Support for ANY type arguments in udfs that expect strongly typed arguments
* Rename the testcase
* Adding license
* Fixed the type name in log
* Fix for the testcase
---
.../sql/planner/SamzaSqlScalarFunctionImpl.java | 15 ++++++++++++++-
.../sql/runner/TestSamzaSqlApplicationConfig.java | 3 +--
.../sql/util/{MyTestUdf.java => MyTestObjUdf.java} | 21 ++++++---------------
.../org/apache/samza/sql/util/MyTestPolyUdf.java | 3 +--
.../java/org/apache/samza/sql/util/MyTestUdf.java | 6 ------
.../apache/samza/sql/util/SamzaSqlTestConfig.java | 3 ++-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 20 ++++++++++++++++++++
7 files changed, 44 insertions(+), 27 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
index 0793bce..b2b6119 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
@@ -20,6 +20,7 @@
package org.apache.samza.sql.planner;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -81,8 +82,20 @@ public class SamzaSqlScalarFunctionImpl implements ScalarFunction, Implementable
final Expression samzaContext = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
final Expression getUdfInstance = Expressions.call(ScalarUdf.class, sqlContext, getUdfMethod,
Expressions.constant(udfMethod.getDeclaringClass().getName()), Expressions.constant(udfName), samzaContext);
+
+ List<Expression> convertedOperands = new ArrayList<>();
+ // SAMZA: 2230 To allow UDFS to accept Untyped arguments.
+ // We explicitly Convert the untyped arguments to type that the UDf expects.
+ for(int index = 0; index < translatedOperands.size(); index++) {
+ if (translatedOperands.get(index).type == Object.class && udfMethod.getParameters()[index].getType() != Object.class) {
+ convertedOperands.add(Expressions.convert_(translatedOperands.get(index), udfMethod.getParameters()[index].getType()));
+ } else {
+ convertedOperands.add(translatedOperands.get(index));
+ }
+ }
+
final Expression callExpression = Expressions.call(Expressions.convert_(getUdfInstance, udfMethod.getDeclaringClass()), udfMethod,
- translatedOperands);
+ convertedOperands);
return callExpression;
}, NullPolicy.NONE, false);
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index c6fb357..7db38f7 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -55,8 +55,7 @@ public class TestSamzaSqlApplicationConfig {
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
- // Two of the UDFs has an overload, hence + 1.
- Assert.assertEquals(numUdfs + 2, samzaSqlApplicationConfig.getUdfMetadata().size());
+ Assert.assertEquals(numUdfs + 1, samzaSqlApplicationConfig.getUdfMetadata().size());
Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
similarity index 77%
copy from samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
copy to samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
index d0ac517..14a68f4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
@@ -20,7 +20,6 @@
package org.apache.samza.sql.util;
import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.udfs.SamzaSqlUdf;
import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
@@ -32,26 +31,18 @@ import org.slf4j.LoggerFactory;
/**
* Test UDF used by unit and integration tests.
*/
-@SamzaSqlUdf(name = "MyTest", description = "Test UDF.")
-public class MyTestUdf implements ScalarUdf {
+@SamzaSqlUdf(name = "MyTestObj", description = "Test UDF.")
+public class MyTestObjUdf implements ScalarUdf {
- private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MyTestObjUdf.class);
@SamzaSqlUdfMethod(params = SamzaSqlFieldType.INT32)
- public Integer execute(Integer value) {
- return value * 2;
+ public Object execute(Integer value) {
+ return value;
}
- @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
- public Integer execute(Object value) {
- return ((Integer) value) * 2;
- }
-
-
@Override
- public void init(Config udfConfig, Context context) {
+ public void init(Config udfConfig) {
LOG.info("Init called with {}", udfConfig);
}
}
-
-
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
index 29769c0..11f6b1b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
@@ -40,12 +40,11 @@ public class MyTestPolyUdf implements ScalarUdf {
return value * 2;
}
- @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
+ @SamzaSqlUdfMethod(params = SamzaSqlFieldType.STRING)
public Integer execute(String value) {
return value.length() * 2;
}
-
@Override
public void init(Config udfConfig, Context context) {
LOG.info("Init called with {}", udfConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
index d0ac517..983df51 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
@@ -42,12 +42,6 @@ public class MyTestUdf implements ScalarUdf {
return value * 2;
}
- @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
- public Integer execute(Object value) {
- return ((Integer) value) * 2;
- }
-
-
@Override
public void init(Config udfConfig, Context context) {
LOG.info("Init called with {}", udfConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index 364d0a9..a5b03a0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -96,7 +96,8 @@ public class SamzaSqlTestConfig {
ConfigBasedUdfResolver.class.getName());
staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
.join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
- MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName()));
+ MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName(),
+ MyTestObjUdf.class.getName()));
String avroSystemConfigPrefix =
String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6b95b27..aef7926 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -479,6 +479,26 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
}
@Test
+ public void testUdfUnTypedArgumentToTypedUdf() {
+ int numMessages = 20;
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ + "select id, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
+ List<String> sqlStmts = Collections.singletonList(sql1);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ runApplication(new MapConfig(staticConfigs));
+
+ LOG.info("output Messages " + TestAvroSystemFactory.messages);
+
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
+ .sorted()
+ .collect(Collectors.toList());
+ Assert.assertEquals(outMessages.size(), numMessages);
+ }
+
+ @Test
public void testEndToEndUdf() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();