You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/29 21:41:35 UTC
[pulsar] branch master updated: fix submit function via url (#3934)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea4231 fix submit function via url (#3934)
5ea4231 is described below
commit 5ea423150ba4d879207a22899cdd6e8154e6382f
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Mar 29 16:41:31 2019 -0500
fix submit function via url (#3934)
* fix submit function via url
* cleaning up
* add test
* make method private
* add additional tests
* cleaning up
* improving tests
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 119 +++++++++++++++++++--
.../org/apache/pulsar/functions/utils/Utils.java | 44 ++------
2 files changed, 119 insertions(+), 44 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 042a952..abd9684 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
import lombok.ToString;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
@@ -61,11 +63,16 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.BufferedInputStream;
import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
@@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+ private Thread fileServerThread;
+ private static final int fileServerPort = PortManager.nextFreePort();
+ private HttpServer fileServer;
@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
@@ -213,12 +223,71 @@ public class PulsarFunctionE2ETest {
System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
- Thread.sleep(100);
+ // setting up simple web sever to test submitting function via URL
+ fileServerThread = new Thread(() -> {
+ try {
+ fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
+ fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
+ try {
+
+ Headers headers = he.getResponseHeaders();
+ headers.add("Content-Type", "application/octet-stream");
+
+ File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
+ byte[] bytes = new byte [(int)file.length()];
+
+ FileInputStream fileInputStream = new FileInputStream(file);
+ BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+ bufferedInputStream.read(bytes, 0, bytes.length);
+
+ he.sendResponseHeaders(200, file.length());
+ OutputStream outputStream = he.getResponseBody();
+ outputStream.write(bytes, 0, bytes.length);
+ outputStream.close();
+
+ } catch (Exception e) {
+ log.error("Error when downloading: {}", e, e);
+ }
+ });
+ fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
+ try {
+
+ Headers headers = he.getResponseHeaders();
+ headers.add("Content-Type", "application/octet-stream");
+
+ File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
+ byte[] bytes = new byte [(int)file.length()];
+
+ FileInputStream fileInputStream = new FileInputStream(file);
+ BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+ bufferedInputStream.read(bytes, 0, bytes.length);
+
+ he.sendResponseHeaders(200, file.length());
+ OutputStream outputStream = he.getResponseBody();
+ outputStream.write(bytes, 0, bytes.length);
+ outputStream.close();
+
+ } catch (Exception e) {
+ log.error("Error when downloading: {}", e, e);
+ }
+ });
+ fileServer.setExecutor(null); // creates a default executor
+ log.info("Starting file server...");
+ fileServer.start();
+ } catch (Exception e) {
+ log.error("Failed to start file server: ", e);
+ fileServer.stop(0);
+ }
+
+ });
+ fileServerThread.start();
}
@AfterMethod
void shutdown() throws Exception {
log.info("--- Shutting down ---");
+ fileServer.stop(0);
+ fileServerThread.interrupt();
pulsarClient.close();
admin.close();
functionsWorkerService.stop();
@@ -309,8 +378,7 @@ public class PulsarFunctionE2ETest {
*
* @throws Exception
*/
- @Test(timeOut = 20000)
- public void testE2EPulsarFunction() throws Exception {
+ private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
@@ -328,7 +396,6 @@ public class PulsarFunctionE2ETest {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
- String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
@@ -386,7 +453,18 @@ public class PulsarFunctionE2ETest {
}
@Test(timeOut = 20000)
- public void testPulsarSinkStats() throws Exception {
+ public void testE2EPulsarFunctionWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ testE2EPulsarFunction(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 40000)
+ public void testE2EPulsarFunctionWithUrl() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
+ testE2EPulsarFunction(jarFilePathUrl);
+ }
+
+ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -401,7 +479,6 @@ public class PulsarFunctionE2ETest {
// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
- String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
@@ -413,7 +490,7 @@ public class PulsarFunctionE2ETest {
} catch (PulsarAdminException e) {
return false;
}
- }, 5, 150);
+ }, 50, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
@@ -586,7 +663,18 @@ public class PulsarFunctionE2ETest {
}
@Test(timeOut = 20000)
- public void testPulsarSourceStats() throws Exception {
+ public void testPulsarSinkStatsWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSinkStats(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSinkStatsWithUrl() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+ testPulsarSinkStats(jarFilePathUrl);
+ }
+
+ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
@@ -595,7 +683,6 @@ public class PulsarFunctionE2ETest {
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
- String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
@@ -615,7 +702,7 @@ public class PulsarFunctionE2ETest {
} catch (PulsarAdminException e) {
return false;
}
- }, 10, 150);
+ }, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
@@ -688,6 +775,18 @@ public class PulsarFunctionE2ETest {
}
@Test(timeOut = 20000)
+ public void testPulsarSourceStatsWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSourceStats(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSourceStatsWithUrl() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+ testPulsarSourceStats(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {
final String namespacePortion = "io";
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 2318f5a..38002a1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -269,12 +269,11 @@ public class Utils {
URL website = new URL(destPkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+ log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
- fos.getChannel().transferFrom(rbc, 0, 10);
- }
- if (tempFile.exists()) {
- tempFile.delete();
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
}
+ log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
return tempFile;
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
@@ -318,39 +317,16 @@ public class Utils {
throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
}
}
+
if (!isEmpty(pkgUrl)) {
- if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
- try {
- URL url = new URL(pkgUrl);
- File file = new File(url.toURI());
- if (!file.exists()) {
- throw new IOException(pkgUrl + " does not exists locally");
- }
- return NarClassLoader.getFromArchive(file, Collections.emptySet());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
- }
- } else if (pkgUrl.startsWith("http")) {
- try {
- URL website = new URL(pkgUrl);
- File tempFile = File.createTempFile("function", ".tmp");
- ReadableByteChannel rbc = Channels.newChannel(website.openStream());
- try (FileOutputStream fos = new FileOutputStream(tempFile)) {
- fos.getChannel().transferFrom(rbc, 0, 10);
- }
- if (tempFile.exists()) {
- tempFile.delete();
- }
- return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
- }
- } else {
- throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]");
+ try {
+ return NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), Collections.emptySet());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
}
+
if (uploadedInputStreamFileName != null) {
try {
return NarClassLoader.getFromArchive(uploadedInputStreamFileName,