You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/30 14:09:37 UTC

[pulsar] branch master updated: Fix: check function implements correct interface (#4844)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9136b  Fix: check function implements correct interface (#4844)
3c9136b is described below

commit 3c9136b25d8941470a6744943ab762d250e97691
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jul 30 07:09:30 2019 -0700

    Fix: check function implements correct interface (#4844)
---
 .../functions/utils/FunctionConfigUtils.java       | 14 +++++++++++
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 29 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 842da22..43e0346 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -369,6 +369,20 @@ public class FunctionConfigUtils {
 
     private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
 
+        try {
+            Class functionClass = clsLoader.loadClass(functionConfig.getClassName());
+
+            if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
+                    && !java.util.function.Function.class.isAssignableFrom(functionClass)) {
+                throw new IllegalArgumentException(
+                        String.format("Function class %s does not implement the correct interface",
+                                functionClass.getName()));
+            }
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+                    String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
+        }
+
         Class<?>[] typeArgs;
         try {
             typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index e5df566..5f90945 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
@@ -109,6 +110,13 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
+    private static final class WrongFunction implements Consumer<String> {
+        @Override
+        public void accept(String s) {
+
+        }
+    }
+
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
@@ -438,6 +446,27 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function class .*. does not implement the correct interface")
+    public void testRegisterFunctionImplementWrongInterface() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    WrongFunction.class.getName(),
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
     private void testRegisterFunctionMissingArguments(
             String tenant,
             String namespace,