You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/29 21:41:35 UTC

[pulsar] branch master updated: fix submit function via url (#3934)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea4231  fix submit function via url (#3934)
5ea4231 is described below

commit 5ea423150ba4d879207a22899cdd6e8154e6382f
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Mar 29 16:41:31 2019 -0500

    fix submit function via url (#3934)
    
    * fix submit function via url
    
    * cleaning up
    
    * add test
    
    * make method private
    
    * add additional tests
    
    * cleaning up
    
    * improving tests
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 119 +++++++++++++++++++--
 .../org/apache/pulsar/functions/utils/Utils.java   |  44 ++------
 2 files changed, 119 insertions(+), 44 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 042a952..abd9684 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
 import lombok.ToString;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
@@ -61,11 +63,16 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
@@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest {
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
 
     private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+    private Thread fileServerThread;
+    private static final int fileServerPort = PortManager.nextFreePort();
+    private HttpServer fileServer;
 
     @DataProvider(name = "validRoleName")
     public Object[][] validRoleName() {
@@ -213,12 +223,71 @@ public class PulsarFunctionE2ETest {
 
         System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
 
-        Thread.sleep(100);
+        // setting up simple web sever to test submitting function via URL
+        fileServerThread = new Thread(() -> {
+            try {
+                fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
+                fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", "application/octet-stream");
+
+                        File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", "application/octet-stream");
+
+                        File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.setExecutor(null); // creates a default executor
+                log.info("Starting file server...");
+                fileServer.start();
+            } catch (Exception e) {
+                log.error("Failed to start file server: ", e);
+                fileServer.stop(0);
+            }
+
+        });
+        fileServerThread.start();
     }
 
     @AfterMethod
     void shutdown() throws Exception {
         log.info("--- Shutting down ---");
+        fileServer.stop(0);
+        fileServerThread.interrupt();
         pulsarClient.close();
         admin.close();
         functionsWorkerService.stop();
@@ -309,8 +378,7 @@ public class PulsarFunctionE2ETest {
      *
      * @throws Exception
      */
-    @Test(timeOut = 20000)
-    public void testE2EPulsarFunction() throws Exception {
+    private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -328,7 +396,6 @@ public class PulsarFunctionE2ETest {
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
 
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
         FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
@@ -386,7 +453,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
-    public void testPulsarSinkStats() throws Exception {
+    public void testE2EPulsarFunctionWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionWithUrl() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
+    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -401,7 +479,6 @@ public class PulsarFunctionE2ETest {
         // create a producer that creates a topic at broker
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
         SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
         admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
 
@@ -413,7 +490,7 @@ public class PulsarFunctionE2ETest {
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 5, 150);
+        }, 50, 150);
         // validate pulsar sink consumer has started on the topic
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
@@ -586,7 +663,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
-    public void testPulsarSourceStats() throws Exception {
+    public void testPulsarSinkStatsWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSinkStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSinkStatsWithUrl() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+        testPulsarSinkStats(jarFilePathUrl);
+    }
+
+    private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + "/output";
@@ -595,7 +683,6 @@ public class PulsarFunctionE2ETest {
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
         SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
         admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
 
@@ -615,7 +702,7 @@ public class PulsarFunctionE2ETest {
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 10, 150);
+        }, 50, 150);
         assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
 
         String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
@@ -688,6 +775,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
+    public void testPulsarSourceStatsWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSourceStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSourceStatsWithUrl() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+        testPulsarSourceStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 20000)
     public void testPulsarFunctionStats() throws Exception {
 
         final String namespacePortion = "io";
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 2318f5a..38002a1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -269,12 +269,11 @@ public class Utils {
             URL website = new URL(destPkgUrl);
             File tempFile = File.createTempFile("function", ".tmp");
             ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+            log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
             try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                fos.getChannel().transferFrom(rbc, 0, 10);
-            }
-            if (tempFile.exists()) {
-                tempFile.delete();
+                fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
             }
+            log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
             return tempFile;
         } else {
             throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
@@ -318,39 +317,16 @@ public class Utils {
                 throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
             }
         }
+
         if (!isEmpty(pkgUrl)) {
-            if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-                try {
-                    URL url = new URL(pkgUrl);
-                    File file = new File(url.toURI());
-                    if (!file.exists()) {
-                        throw new IOException(pkgUrl + " does not exists locally");
-                    }
-                    return NarClassLoader.getFromArchive(file, Collections.emptySet());
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
-                }
-            } else if (pkgUrl.startsWith("http")) {
-                try {
-                    URL website = new URL(pkgUrl);
-                    File tempFile = File.createTempFile("function", ".tmp");
-                    ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-                    try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                        fos.getChannel().transferFrom(rbc, 0, 10);
-                    }
-                    if (tempFile.exists()) {
-                        tempFile.delete();
-                    }
-                    return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
-                }
-            } else {
-                throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]");
+            try {
+                return NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), Collections.emptySet());
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
             }
         }
+
         if (uploadedInputStreamFileName != null) {
             try {
                 return NarClassLoader.getFromArchive(uploadedInputStreamFileName,