You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/07/11 17:26:38 UTC

[samza] branch master updated: SAMZA-2230: Support for ANY type arguments in udfs that expect strongly typed arguments (#1060)

This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5071629  SAMZA-2230: Support for ANY type arguments in udfs that expect strongly typed arguments (#1060)
5071629 is described below

commit 5071629cdb432143363223259b72c2dde1278271
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Thu Jul 11 10:26:33 2019 -0700

    SAMZA-2230: Support for ANY type arguments in udfs that expect strongly typed arguments (#1060)
    
    * Support for ANY type arguments in udfs that expect strongly typed arguments
    
    * Rename the testcase
    
    * Adding license
    
    * Fixed the type name in log
    
    * Fix for the testcase
---
 .../sql/planner/SamzaSqlScalarFunctionImpl.java     | 15 ++++++++++++++-
 .../sql/runner/TestSamzaSqlApplicationConfig.java   |  3 +--
 .../sql/util/{MyTestUdf.java => MyTestObjUdf.java}  | 21 ++++++---------------
 .../org/apache/samza/sql/util/MyTestPolyUdf.java    |  3 +--
 .../java/org/apache/samza/sql/util/MyTestUdf.java   |  6 ------
 .../apache/samza/sql/util/SamzaSqlTestConfig.java   |  3 ++-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java   | 20 ++++++++++++++++++++
 7 files changed, 44 insertions(+), 27 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
index 0793bce..b2b6119 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.planner;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -81,8 +82,20 @@ public class SamzaSqlScalarFunctionImpl implements ScalarFunction, Implementable
       final Expression samzaContext = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
       final Expression getUdfInstance = Expressions.call(ScalarUdf.class, sqlContext, getUdfMethod,
           Expressions.constant(udfMethod.getDeclaringClass().getName()), Expressions.constant(udfName), samzaContext);
+
+      List<Expression> convertedOperands = new ArrayList<>();
+      // SAMZA: 2230 To allow UDFS to accept Untyped arguments.
+      // We explicitly Convert the untyped arguments to type that the UDf expects.
+      for(int index = 0; index < translatedOperands.size(); index++) {
+        if (translatedOperands.get(index).type == Object.class && udfMethod.getParameters()[index].getType() != Object.class) {
+          convertedOperands.add(Expressions.convert_(translatedOperands.get(index), udfMethod.getParameters()[index].getType()));
+        } else {
+          convertedOperands.add(translatedOperands.get(index));
+        }
+      }
+
       final Expression callExpression = Expressions.call(Expressions.convert_(getUdfInstance, udfMethod.getDeclaringClass()), udfMethod,
-          translatedOperands);
+          convertedOperands);
       return callExpression;
     }, NullPolicy.NONE, false);
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index c6fb357..7db38f7 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -55,8 +55,7 @@ public class TestSamzaSqlApplicationConfig {
             .collect(Collectors.toList()),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
-    // Two of the UDFs has an overload, hence + 1.
-    Assert.assertEquals(numUdfs + 2, samzaSqlApplicationConfig.getUdfMetadata().size());
+    Assert.assertEquals(numUdfs + 1, samzaSqlApplicationConfig.getUdfMetadata().size());
     Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
     Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
similarity index 77%
copy from samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
copy to samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
index d0ac517..14a68f4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
@@ -20,7 +20,6 @@
 package org.apache.samza.sql.util;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.udfs.SamzaSqlUdf;
 import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
@@ -32,26 +31,18 @@ import org.slf4j.LoggerFactory;
 /**
  * Test UDF used by unit and integration tests.
  */
-@SamzaSqlUdf(name = "MyTest", description = "Test UDF.")
-public class MyTestUdf implements ScalarUdf {
+@SamzaSqlUdf(name = "MyTestObj", description = "Test UDF.")
+public class MyTestObjUdf implements ScalarUdf {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+  private static final Logger LOG = LoggerFactory.getLogger(MyTestObjUdf.class);
 
   @SamzaSqlUdfMethod(params = SamzaSqlFieldType.INT32)
-  public Integer execute(Integer value) {
-    return value * 2;
+  public Object execute(Integer value) {
+    return value;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
-  public Integer execute(Object value) {
-    return ((Integer) value) * 2;
-  }
-
-
   @Override
-  public void init(Config udfConfig, Context context) {
+  public void init(Config udfConfig) {
     LOG.info("Init called with {}", udfConfig);
   }
 }
-
-
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
index 29769c0..11f6b1b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
@@ -40,12 +40,11 @@ public class MyTestPolyUdf implements ScalarUdf {
     return value * 2;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
+  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.STRING)
   public Integer execute(String value) {
     return value.length() * 2;
   }
 
-
   @Override
   public void init(Config udfConfig, Context context) {
     LOG.info("Init called with {}", udfConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
index d0ac517..983df51 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
@@ -42,12 +42,6 @@ public class MyTestUdf implements ScalarUdf {
     return value * 2;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
-  public Integer execute(Object value) {
-    return ((Integer) value) * 2;
-  }
-
-
   @Override
   public void init(Config udfConfig, Context context) {
     LOG.info("Init called with {}", udfConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index 364d0a9..a5b03a0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -96,7 +96,8 @@ public class SamzaSqlTestConfig {
         ConfigBasedUdfResolver.class.getName());
     staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
         .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
-            MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName()));
+            MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName(),
+            MyTestObjUdf.class.getName()));
 
     String avroSystemConfigPrefix =
         String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6b95b27..aef7926 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -479,6 +479,26 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testUdfUnTypedArgumentToTypedUdf() {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + "select id, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    LOG.info("output Messages " + TestAvroSystemFactory.messages);
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(outMessages.size(), numMessages);
+  }
+
+  @Test
   public void testEndToEndUdf() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();