You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/05 06:22:29 UTC
[pulsar] branch master updated: Function localrun improved (#4453)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f56d30d Function localrun improved (#4453)
f56d30d is described below
commit f56d30ddf40ef9eec172db1c0f78cc2143151bdb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jun 4 23:22:23 2019 -0700
Function localrun improved (#4453)
* fix localrun and add tests
* minor fix
* Improving localrun for functions
* improving impl and adding tests
* add missing modules
---
bin/function-localrunner | 2 +-
distribution/server/pom.xml | 13 +
pulsar-broker/pom.xml | 7 +
.../worker/PulsarFunctionLocalRunTest.java | 719 +++++++++++++++++++++
pulsar-functions/localrun-shaded/pom.xml | 374 +++++++++++
pulsar-functions/localrun/pom.xml | 73 +++
.../org/apache/pulsar/functions/LocalRunner.java | 466 +++++++++++++
pulsar-functions/pom.xml | 2 +
.../pulsar/functions/runtime/LocalRunner.java | 283 --------
9 files changed, 1655 insertions(+), 284 deletions(-)
diff --git a/bin/function-localrunner b/bin/function-localrunner
index 16362b4..a7aad76 100755
--- a/bin/function-localrunner
+++ b/bin/function-localrunner
@@ -59,7 +59,7 @@ fi
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
-MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner"
+MAINCLASS="org.apache.pulsar.functions.LocalRunner"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 42b7300..a40538c 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -164,6 +164,19 @@
</exclusions>
</dependency>
+ <!-- local-runner -->
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-local-runner-original</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- grpc -->
<dependency>
<groupId>io.grpc</groupId>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 331f2d4..85b88ac 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -138,6 +138,13 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-local-runner-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- functions related dependencies (end) -->
<dependency>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
new file mode 100644
index 0000000..bdcd27d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink;
+import org.apache.pulsar.io.datagenerator.DataGeneratorSource;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Test Pulsar sink on function
+ *
+ */
+public class PulsarFunctionLocalRunTest {
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ ServiceConfiguration config;
+ WorkerConfig workerConfig;
+ URL urlTls;
+ PulsarService pulsar;
+ PulsarAdmin admin;
+ PulsarClient pulsarClient;
+ BrokerStats brokerStatsClient;
+ WorkerService functionsWorkerService;
+ final String tenant = "external-repl-prop";
+ String pulsarFunctionsNamespace = tenant + "/" + CLUSTER + "/pulsar-function-admin";
+ String primaryHost;
+ String workerId;
+
+ private static final String CLUSTER = "local";
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int brokerWebServicePort = PortManager.nextFreePort();
+ private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+ private final int brokerServicePort = PortManager.nextFreePort();
+ private final int brokerServiceTlsPort = PortManager.nextFreePort();
+ private final int workerServicePort = PortManager.nextFreePort();
+
+ private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+ private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+ private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+ private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarFunctionLocalRunTest.class);
+ private Thread fileServerThread;
+ private static final int fileServerPort = PortManager.nextFreePort();
+ private HttpServer fileServer;
+
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @BeforeMethod
+ void setup(Method method) throws Exception {
+
+ // delete all function temp files
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("function");
+ }
+ });
+
+ for (File file : foundFiles) {
+ file.delete();
+ }
+
+ log.info("--- Setting up method {} ---", method.getName());
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+ bkEnsemble.start();
+
+ String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
+
+ config = spy(new ServiceConfiguration());
+ config.setClusterName(CLUSTER);
+ Set<String> superUsers = Sets.newHashSet("superUser");
+ config.setSuperUserRoles(superUsers);
+ config.setWebServicePort(Optional.ofNullable(brokerWebServicePort));
+ config.setWebServicePortTls(Optional.ofNullable(brokerWebServiceTlsPort));
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(Optional.ofNullable(brokerServicePort));
+ config.setBrokerServicePortTls(Optional.ofNullable(brokerServiceTlsPort));
+ config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setTlsAllowInsecureConnection(true);
+ config.setAdvertisedAddress("localhost");
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ config.setAuthenticationEnabled(true);
+ config.setAuthenticationProviders(providers);
+
+ config.setAuthorizationEnabled(true);
+ config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+ config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ config.setBrokerClientTlsEnabled(true);
+
+ functionsWorkerService = createPulsarFunctionWorker(config);
+ urlTls = new URL(brokerServiceUrl);
+ Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+ pulsar = new PulsarService(config, functionWorkerService);
+ pulsar.start();
+
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+
+ admin = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(true).authentication(authTls).build());
+
+ brokerStatsClient = admin.brokerStats();
+ primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+ // update cluster metadata
+ ClusterData clusterData = new ClusterData(urlTls.toString());
+ admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+ if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+ clientBuilder.enableTls(workerConfig.isUseTls());
+ clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+ clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+ workerConfig.getClientAuthenticationParameters());
+ }
+ pulsarClient = clientBuilder.build();
+
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add("superUser");
+ propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList(CLUSTER)));
+ admin.tenants().updateTenant(tenant, propAdmin);
+
+ // setting up simple web sever to test submitting function via URL
+ fileServerThread = new Thread(() -> {
+ try {
+ fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
+ fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
+ try {
+
+ Headers headers = he.getResponseHeaders();
+ headers.add("Content-Type", "application/octet-stream");
+
+ File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
+ byte[] bytes = new byte [(int)file.length()];
+
+ FileInputStream fileInputStream = new FileInputStream(file);
+ BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+ bufferedInputStream.read(bytes, 0, bytes.length);
+
+ he.sendResponseHeaders(200, file.length());
+ OutputStream outputStream = he.getResponseBody();
+ outputStream.write(bytes, 0, bytes.length);
+ outputStream.close();
+
+ } catch (Exception e) {
+ log.error("Error when downloading: {}", e, e);
+ }
+ });
+ fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
+ try {
+
+ Headers headers = he.getResponseHeaders();
+ headers.add("Content-Type", "application/octet-stream");
+
+ File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
+ byte[] bytes = new byte [(int)file.length()];
+
+ FileInputStream fileInputStream = new FileInputStream(file);
+ BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+ bufferedInputStream.read(bytes, 0, bytes.length);
+
+ he.sendResponseHeaders(200, file.length());
+ OutputStream outputStream = he.getResponseBody();
+ outputStream.write(bytes, 0, bytes.length);
+ outputStream.close();
+
+ } catch (Exception e) {
+ log.error("Error when downloading: {}", e, e);
+ }
+ });
+ fileServer.setExecutor(null); // creates a default executor
+ log.info("Starting file server...");
+ fileServer.start();
+ } catch (Exception e) {
+ log.error("Failed to start file server: ", e);
+ fileServer.stop(0);
+ }
+
+ });
+ fileServerThread.start();
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ fileServer.stop(0);
+ fileServerThread.interrupt();
+ pulsarClient.close();
+ admin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
+ FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ workerConfig = new WorkerConfig();
+ workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+ workerConfig.setSchedulerClassName(
+ org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+ workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName(CLUSTER));
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
+ workerConfig.setFailureCheckFreqMs(100);
+ workerConfig.setNumFunctionPackageReplicas(1);
+ workerConfig.setClusterCoordinationTopicName("coordinate");
+ workerConfig.setFunctionAssignmentTopicName("assignment");
+ workerConfig.setFunctionMetadataTopicName("metadata");
+ workerConfig.setInstanceLivenessCheckFreqMs(100);
+ workerConfig.setWorkerPort(workerServicePort);
+ workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+ String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(workerId);
+
+ workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ workerConfig.setClientAuthenticationParameters(
+ String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+ workerConfig.setUseTls(true);
+ workerConfig.setTlsAllowInsecureConnection(true);
+ workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ workerConfig.setAuthenticationEnabled(true);
+ workerConfig.setAuthorizationEnabled(true);
+
+ return new WorkerService(workerConfig);
+ }
+
+ protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setInputs(Collections.singleton(sourceTopic));
+ functionConfig.setAutoAck(true);
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
+ return functionConfig;
+ }
+
+ private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) {
+ SourceConfig sourceConfig = new SourceConfig();
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(functionName);
+ sourceConfig.setParallelism(1);
+ sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sourceConfig.setTopicName(sinkTopic);
+ return sourceConfig;
+ }
+
+ private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(functionName);
+ sinkConfig.setParallelism(1);
+ sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
+ sinkConfig.setSourceSubscriptionName(subName);
+ sinkConfig.setCleanupSubscription(true);
+ return sinkConfig;
+ }
+ /**
+ * Validates pulsar sink e2e functionality on functions.
+ *
+ * @throws Exception
+ */
+ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList(CLUSTER));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
+
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ sourceTopic, sinkTopic, subscriptionName);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+ functionConfig.setJar(jarFilePathUrl);
+ LocalRunner localRunner = LocalRunner.builder()
+ .functionConfig(functionConfig)
+ .clientAuthPlugin(AuthenticationTls.class.getName())
+ .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+ .useTls(true)
+ .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .tlsAllowInsecureConnection(true)
+ .tlsHostNameVerificationEnabled(false)
+ .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+ localRunner.start(false);
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats stats = admin.topics().getStats(sourceTopic);
+ return stats.subscriptions.get(subscriptionName) != null
+ && !stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+ TopicStats stats = admin.topics().getStats(sourceTopic);
+ assertTrue(stats.subscriptions.get(subscriptionName) != null
+ && !stats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ for (int i = 0; i < totalMsgs; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ assertEquals(propertyValue, receivedPropertyValue);
+ assertEquals(msg.getValue(), "my-message-" + i + "!");
+ }
+
+ // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+ // due to publish failure
+ assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+
+ // stop functions
+ localRunner.stop();
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ return topicStats.subscriptions.get(subscriptionName) != null
+ && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty();
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 20, 150);
+
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ assertTrue(topicStats.subscriptions.get(subscriptionName) != null
+ && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() == 404) {
+ return true;
+ }
+ return false;
+ }
+ }, 10, 150);
+
+ try {
+ assertTrue(admin.topics().getStats(sinkTopic).publishers.size() == 0);
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() != 404) {
+ fail();
+ }
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testE2EPulsarFunctionLocalRun() throws Exception {
+ testE2EPulsarFunctionLocalRun(null);
+ }
+
+ @Test(timeOut = 20000)
+ public void testE2EPulsarFunctionLocalRunWithJar() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+ }
+
+ @Test(timeOut = 40000)
+ public void testE2EPulsarFunctionLocalRunURL() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
+ testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+ }
+
+ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String sourceName = "PulsarSource-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList(CLUSTER));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+ if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+ sourceConfig.setClassName(DataGeneratorSource.class.getName());
+ }
+
+ sourceConfig.setArchive(jarFilePathUrl);
+ LocalRunner localRunner = LocalRunner.builder()
+ .sourceConfig(sourceConfig)
+ .clientAuthPlugin(AuthenticationTls.class.getName())
+ .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+ .useTls(true)
+ .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .tlsAllowInsecureConnection(true)
+ .tlsHostNameVerificationEnabled(false)
+ .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+
+ localRunner.start(false);
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 150);
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats sourceStats = admin.topics().getStats(sinkTopic);
+ return sourceStats.publishers.size() == 1
+ && sourceStats.publishers.get(0).metadata != null
+ && sourceStats.publishers.get(0).metadata.containsKey("id")
+ && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ TopicStats sourceStats = admin.topics().getStats(sinkTopic);
+ assertEquals(sourceStats.publishers.size(), 1);
+ assertTrue(sourceStats.publishers.get(0).metadata != null);
+ assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+ assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size() == 1)
+ && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+ assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+
+ localRunner.stop();
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() == 404) {
+ return true;
+ }
+ return false;
+ }
+ }, 10, 150);
+
+ try {
+ assertTrue(admin.topics().getStats(sinkTopic).publishers.size() == 0);
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() != 404) {
+ fail();
+ }
+ }
+ }
+
+
+ @Test(timeOut = 20000)
+ public void testPulsarSourceLocalRunNoArchive() throws Exception {
+ testPulsarSourceLocalRun(null);
+ }
+
+ // TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar
+// @Test(timeOut = 20000)
+// public void testPulsarSourceLocalRunWithFile() throws Exception {
+// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+// testPulsarSourceStats(jarFilePathUrl);
+// }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSourceLocalRunWithUrl() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+ testPulsarSourceLocalRun(jarFilePathUrl);
+ }
+
+
+ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String sinkName = "PulsarSink-test";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("local"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+ if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+ sinkConfig.setClassName(DataGeneratorPrintSink.class.getName());
+ }
+
+ sinkConfig.setArchive(jarFilePathUrl);
+ LocalRunner localRunner = LocalRunner.builder()
+ .sinkConfig(sinkConfig)
+ .clientAuthPlugin(AuthenticationTls.class.getName())
+ .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+ .useTls(true)
+ .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .tlsAllowInsecureConnection(true)
+ .tlsHostNameVerificationEnabled(false)
+ .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+
+ localRunner.start(false);
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+ return topicStats.subscriptions.containsKey(subscriptionName)
+ && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+ && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
+
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 20, 150);
+
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ assertEquals(topicStats.subscriptions.size(), 1);
+ assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
+ assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+ assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 1000);
+
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 200);
+
+ // stop sink
+ localRunner.stop();
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats stats = admin.topics().getStats(sourceTopic);
+ return stats.subscriptions.get(subscriptionName) != null
+ && stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 20, 150);
+
+ topicStats = admin.topics().getStats(sourceTopic);
+ assertTrue(topicStats.subscriptions.get(subscriptionName) != null
+ && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+ }
+
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStatsNoArchive() throws Exception {
+ testPulsarSinkStats(null);
+ }
+
+// @Test(timeOut = 20000)
+// public void testPulsarSinkStatsWithFile() throws Exception {
+// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+// testPulsarSinkStats(jarFilePathUrl);
+// }
+
+ @Test(timeOut = 40000)
+ public void testPulsarSinkStatsWithUrl() throws Exception {
+ String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+ testPulsarSinkStats(jarFilePathUrl);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
new file mode 100644
index 0000000..3e0f2fe
--- /dev/null
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -0,0 +1,374 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-functions-local-runner</artifactId>
+ <name>Pulsar Functions :: Local Runner Shaded</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-local-runner-original</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>false</minimizeJar>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
+ </transformers>
+ <artifactSet>
+ <excludes>
+ <exclude>io.netty:netty-common</exclude>
+ <exclude>io.netty:netty-buffer</exclude>
+ <exclude>io.netty:netty-codec-http2</exclude>
+ <exclude>io.netty:netty-codec-http</exclude>
+ <exclude>io.netty:netty-codec-socks</exclude>
+ <exclude>io.netty:netty-codec</exclude>
+ <exclude>io.netty:netty-handler</exclude>
+ <exclude>io.netty:netty-handler-proxy</exclude>
+ <exclude>io.netty:netty-transport</exclude>
+ <exclude>io.netty:netty-resolver</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Shading signed JARs will fail without
+ this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>com.typesafe.netty</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.jute</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.jute</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.servlet</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.servlet</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.junit</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.junit</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>junit</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.junit</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.jodah</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jodah</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.lz4</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.lz4</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.reactivestreams</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.swagger</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.yaml</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.yaml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.jctools</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.jctools</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.squareup.okhttp</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okhttp</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.grpc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.joda</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.joda</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.ws.rs</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.ws.rs</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.kubernetes</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.kubernetes</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.opencensus</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.opencensus</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.jpountz</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.aspectj</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.aspectj</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-configuration</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-configuration</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.tukaani</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.tukaani</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.github</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-io</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-io</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.distributedlog</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.inferred</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.bookkeeper</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.bookkeeper</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bookkeeper</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>dlshade</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.codehaus.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.java.dev.jna</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.curator</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.curator</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.validation</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.validation</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.activation</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.activation</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.prometheus</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.zookeeper</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.jsonwebtoken</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.jsonwebtoken</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-codec</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-codec</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.thoughtworks.paranamer</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.thoughtworks.paranamer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.codehaus.mojo</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.mojo</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.github.luben</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github.luben</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>jline</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.jline</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-logging</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-logging</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.bouncycastle</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bouncycastle</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.xerial.snappy</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.xerial.snappy</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.annotation</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.annotation</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.checkerframework</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.checkerframework</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.yetus</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.yetus</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-cli</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-cli</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>commons-lang</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-lang</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.squareup.okio</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okio</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.rocksdb</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.rocksdb</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.objenesis</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.objenesis</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.eclipse.jetty</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse.jetty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.avro</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>avro.shaded</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.beust</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.hamcrest</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>aj.org</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.aj.org</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.scurrilous</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>okio</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.okio</shadedPattern>
+ </relocation>
+ <!--
+ asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
+ see {@link https://github.com/apache/incubator-pulsar/pull/390}
+ and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
+ -->
+ <relocation>
+ <pattern>org.asynchttpclient</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+ </relocation>
+ <!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
+ <relocation>
+ <pattern>org.apache.logging</pattern>
+ <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.logging</shadedPattern>
+ </relocation>
+ -->
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-functions/localrun/pom.xml b/pulsar-functions/localrun/pom.xml
new file mode 100644
index 0000000..d53b0ab
--- /dev/null
+++ b/pulsar-functions/localrun/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-functions-local-runner-original</artifactId>
+ <name>Pulsar Functions :: Local Runner original</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-runtime</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
new file mode 100644
index 0000000..1540744
--- /dev/null
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
+import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader;
+import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
+
+@Slf4j
+public class LocalRunner {
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final List<RuntimeSpawner> spawners = new LinkedList<>();
+
+ public enum RuntimeEnv {
+ THREAD,
+ PROCESS
+ }
+
+ public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
+ @Override
+ public FunctionConfig convert(String value) {
+ try {
+ return ObjectMapperFactory.getThreadLocal().readValue(value, FunctionConfig.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse function config:", e);
+ }
+ }
+ }
+
+ public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
+ @Override
+ public SourceConfig convert(String value) {
+ try {
+ return ObjectMapperFactory.getThreadLocal().readValue(value, SourceConfig.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse source config:", e);
+ }
+ }
+ }
+
+ public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
+ @Override
+ public SinkConfig convert(String value) {
+ try {
+ return ObjectMapperFactory.getThreadLocal().readValue(value, SinkConfig.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse sink config:", e);
+ }
+ }
+ }
+
+ public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
+ @Override
+ public RuntimeEnv convert(String value) {
+ return RuntimeEnv.valueOf(value);
+ }
+ }
+
+ @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
+ protected FunctionConfig functionConfig;
+ @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
+ protected SourceConfig sourceConfig;
+ @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
+ protected SinkConfig sinkConfig;
+ @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
+ protected String stateStorageServiceUrl;
+ @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
+ protected String brokerServiceUrl;
+ @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
+ protected String clientAuthPlugin;
+ @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
+ protected String clientAuthParams;
+ @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
+ protected boolean useTls;
+ @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
+ protected boolean tlsAllowInsecureConnection;
+ @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
+ protected boolean tlsHostNameVerificationEnabled;
+ @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
+ protected String tlsTrustCertFilePath;
+ @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
+ protected int instanceIdOffset = 0;
+ @Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
+ protected RuntimeEnv runtimeEnv;
+
+ private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
+
+ public static void main(String[] args) throws Exception {
+ LocalRunner localRunner = LocalRunner.builder().build();
+ JCommander jcommander = new JCommander(localRunner);
+ jcommander.setProgramName("LocalRunner");
+
+ // parse args by JCommander
+ jcommander.parse(args);
+ localRunner.start(true);
+ }
+
+ @Builder
+ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, String
+ stateStorageServiceUrl, String brokerServiceUrl, String clientAuthPlugin, String clientAuthParams,
+ boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
+ String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv) {
+ this.functionConfig = functionConfig;
+ this.sourceConfig = sourceConfig;
+ this.sinkConfig = sinkConfig;
+ this.stateStorageServiceUrl = stateStorageServiceUrl;
+ this.brokerServiceUrl = brokerServiceUrl;
+ this.clientAuthPlugin = clientAuthPlugin;
+ this.clientAuthParams = clientAuthParams;
+ this.useTls = useTls;
+ this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
+ this.tlsHostNameVerificationEnabled = tlsHostNameVerificationEnabled;
+ this.tlsTrustCertFilePath = tlsTrustCertFilePath;
+ this.instanceIdOffset = instanceIdOffset;
+ this.runtimeEnv = runtimeEnv;
+
+ java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ LocalRunner.this.stop();
+ }
+ });
+ }
+
+ public synchronized void stop() {
+ running.set(false);
+ log.info("Shutting down the localrun runtimeSpawner ...");
+ for (RuntimeSpawner spawner : spawners) {
+ spawner.close();
+ }
+ spawners.clear();
+ }
+
+ public void start(boolean blocking) throws Exception {
+ List<RuntimeSpawner> local = new LinkedList<>();
+ synchronized (this) {
+ if (running.get() == true) {
+ throw new IllegalArgumentException("Pulsar Function local run already started!");
+ }
+
+ Function.FunctionDetails functionDetails;
+ String userCodeFile;
+ int parallelism;
+ if (functionConfig != null) {
+ FunctionConfigUtils.inferMissingArguments(functionConfig);
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ parallelism = functionConfig.getParallelism();
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+ userCodeFile = functionConfig.getJar();
+
+ // if code file not specified try to get location of the code based on class.
+ if (userCodeFile == null && functionConfig.getClassName() != null) {
+ userCodeFile = Thread.currentThread().getContextClassLoader()
+ .loadClass(functionConfig.getClassName())
+ .getProtectionDomain().getCodeSource().getLocation().getFile();
+ }
+
+ if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+ classLoader = FunctionConfigUtils.validate(functionConfig, file);
+ } else {
+ File file = new File(userCodeFile);
+ if (!file.exists()) {
+ throw new RuntimeException("User jar does not exist");
+ }
+ classLoader = FunctionConfigUtils.validate(functionConfig, file);
+ }
+
+ } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+ userCodeFile = functionConfig.getGo();
+ } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
+ userCodeFile = functionConfig.getPy();
+ } else {
+ throw new UnsupportedOperationException();
+ }
+
+ functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader);
+ } else if (sourceConfig != null) {
+ inferMissingArguments(sourceConfig);
+ userCodeFile = sourceConfig.getArchive();
+
+ // if code file not specified try to get location of the code based on class.
+ if (userCodeFile == null && sourceConfig.getClassName() != null) {
+ userCodeFile = Thread.currentThread().getContextClassLoader()
+ .loadClass(sourceConfig.getClassName())
+ .getProtectionDomain().getCodeSource().getLocation().getFile();
+ }
+
+ if (userCodeFile == null) {
+ userCodeFile = Thread.currentThread().getContextClassLoader()
+ .loadClass(LocalRunner.class.getName())
+ .getProtectionDomain().getCodeSource().getLocation().getFile();
+ }
+ log.info("userCodeFile: {}", userCodeFile);
+
+ String builtInSource = isBuiltInSource(userCodeFile);
+ if (builtInSource != null) {
+ sourceConfig.setArchive(builtInSource);
+ }
+ parallelism = sourceConfig.getParallelism();
+
+ if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+
+ } else {
+ File file = new File(userCodeFile);
+ if (!file.exists()) {
+ throw new RuntimeException("Source archive does not exist");
+ }
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+ }
+ } else if (sinkConfig != null) {
+ inferMissingArguments(sinkConfig);
+ userCodeFile = sinkConfig.getArchive();
+
+ // if code file not specified try to get location of the code based on class.
+ if (userCodeFile == null && sinkConfig.getClassName() != null) {
+ userCodeFile = Thread.currentThread().getContextClassLoader()
+ .loadClass(sinkConfig.getClassName())
+ .getProtectionDomain().getCodeSource().getLocation().getFile();
+ }
+
+ String builtInSink = isBuiltInSource(userCodeFile);
+ if (builtInSink != null) {
+ sinkConfig.setArchive(builtInSink);
+ }
+ parallelism = sinkConfig.getParallelism();
+
+ if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+ } else {
+ File file = new File(userCodeFile);
+ if (!file.exists()) {
+ throw new RuntimeException("Sink archive does not exist");
+ }
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+ }
+ } else {
+ throw new IllegalArgumentException("Must specify Function, Source or Sink config");
+ }
+
+ if (System.getProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY) == null) {
+ System.setProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY,
+ LocalRunner.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+ }
+
+ AuthenticationConfig authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin
+ (clientAuthPlugin)
+ .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+ .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+ .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+ .tlsTrustCertsFilePath(tlsTrustCertFilePath).build();
+
+ String serviceUrl = DEFAULT_SERVICE_URL;
+ if (brokerServiceUrl != null) {
+ serviceUrl = brokerServiceUrl;
+ }
+
+ if ((sourceConfig != null || sinkConfig != null || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA)
+ && (runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
+ // By default run java functions as threads
+ startThreadedMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
+ stateStorageServiceUrl, authConfig, userCodeFile);
+ } else {
+ startProcessMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
+ stateStorageServiceUrl, authConfig, userCodeFile);
+ }
+ local.addAll(spawners);
+ }
+
+ if (blocking) {
+ for (RuntimeSpawner spawner : local) {
+ spawner.join();
+ log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
+ }
+ }
+ }
+
+ private void startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
+ int parallelism, int instanceIdOffset, String serviceUrl,
+ String stateStorageServiceUrl, AuthenticationConfig authConfig,
+ String userCodeFile) throws Exception {
+
+ try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
+ serviceUrl,
+ stateStorageServiceUrl,
+ authConfig,
+ null, /* java instance jar file */
+ null, /* python instance file */
+ null, /* log directory */
+ null, /* extra dependencies dir */
+ new DefaultSecretsProviderConfigurator(), false)) {
+
+ for (int i = 0; i < parallelism; ++i) {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ instanceConfig.setFunctionDetails(functionDetails);
+ // TODO: correctly implement function version and id
+ instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+ instanceConfig.setFunctionId(UUID.randomUUID().toString());
+ instanceConfig.setInstanceId(i + instanceIdOffset);
+ instanceConfig.setMaxBufferedTuples(1024);
+ instanceConfig.setPort(FunctionCommon.findAvailablePort());
+ instanceConfig.setClusterName("local");
+ RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+ instanceConfig,
+ userCodeFile,
+ null,
+ containerFactory,
+ 30000);
+ spawners.add(runtimeSpawner);
+ runtimeSpawner.start();
+ }
+ Timer statusCheckTimer = new Timer();
+ statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
+ int index = 0;
+ for (RuntimeSpawner spawner : spawners) {
+ futures[index] = spawner.getFunctionStatusAsJson(index);
+ index++;
+ }
+ try {
+ CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+ for (index = 0; index < futures.length; ++index) {
+ String json = futures[index].get();
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ log.info(gson.toJson(new JsonParser().parse(json)));
+ }
+ } catch (Exception ex) {
+ log.error("Could not get status from all local instances");
+ }
+ }
+ }, 30000, 30000);
+ java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ statusCheckTimer.cancel();
+ }
+ });
+ }
+ }
+
+
+ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
+ int parallelism, int instanceIdOffset, String serviceUrl,
+ String stateStorageServiceUrl, AuthenticationConfig authConfig,
+ String userCodeFile) throws Exception {
+ ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
+ serviceUrl,
+ stateStorageServiceUrl,
+ authConfig,
+ new ClearTextSecretsProvider(), null);
+ for (int i = 0; i < parallelism; ++i) {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ instanceConfig.setFunctionDetails(functionDetails);
+ // TODO: correctly implement function version and id
+ instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+ instanceConfig.setFunctionId(UUID.randomUUID().toString());
+ instanceConfig.setInstanceId(i + instanceIdOffset);
+ instanceConfig.setMaxBufferedTuples(1024);
+ instanceConfig.setPort(FunctionCommon.findAvailablePort());
+ instanceConfig.setClusterName("local");
+ RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+ instanceConfig,
+ userCodeFile,
+ null,
+ threadRuntimeFactory,
+ 30000);
+ spawners.add(runtimeSpawner);
+ runtimeSpawner.start();
+ }
+ }
+
+ private String isBuiltInSource(String sourceType) throws IOException {
+ // Validate the connector source type from the locally available connectors
+ Connectors connectors = getConnectors();
+
+ if (connectors.getSources().containsKey(sourceType)) {
+ // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
+ return connectors.getSources().get(sourceType).toString();
+ } else {
+ return null;
+ }
+ }
+
+ private String isBuiltInSink(String sinkType) throws IOException {
+ // Validate the connector source type from the locally available connectors
+ Connectors connectors = getConnectors();
+
+ if (connectors.getSinks().containsKey(sinkType)) {
+ // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
+ return connectors.getSinks().get(sinkType).toString();
+ } else {
+ return null;
+ }
+ }
+
+ private Connectors getConnectors() throws IOException {
+ // Validate the connector source type from the locally available connectors
+ String pulsarHome = System.getenv("PULSAR_HOME");
+ if (pulsarHome == null) {
+ pulsarHome = Paths.get("").toAbsolutePath().toString();
+ }
+ String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
+ return ConnectorUtils.searchForConnectors(connectorsDir);
+ }
+}
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 0f0219a..172acf0 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -41,6 +41,8 @@
<module>runtime-all</module>
<module>worker</module>
<module>secrets</module>
+ <module>localrun</module>
+ <module>localrun-shaded</module>
</modules>
</project>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
deleted file mode 100644
index 1c180ab..0000000
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.runtime;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.utils.io.Connectors;
-
-import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
-import static org.apache.pulsar.functions.utils.FunctionCommon.*;
-
-@Slf4j
-public class LocalRunner {
-
- @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true)
- protected String functionConfigString;
- @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true)
- protected String sourceConfigString;
- @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true)
- protected String sinkConfigString;
- @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
- protected String stateStorageServiceUrl;
- @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
- protected String brokerServiceUrl;
- @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
- protected String clientAuthPlugin;
- @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
- protected String clientAuthParams;
- @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
- protected boolean useTls;
- @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
- protected boolean tlsAllowInsecureConnection;
- @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
- protected boolean tlsHostNameVerificationEnabled;
- @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
- protected String tlsTrustCertFilePath;
- @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
- protected Integer instanceIdOffset = 0;
- private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
-
- public static void main(String[] args) throws Exception {
- LocalRunner localRunner = new LocalRunner();
- JCommander jcommander = new JCommander(localRunner);
- jcommander.setProgramName("LocalRunner");
-
- // parse args by JCommander
- jcommander.parse(args);
- localRunner.start();
- }
-
- void start() throws Exception {
- Function.FunctionDetails functionDetails;
- String userCodeFile;
- int parallelism;
- if (!StringUtils.isEmpty(functionConfigString)) {
- FunctionConfig functionConfig = new Gson().fromJson(functionConfigString, FunctionConfig.class);
- FunctionConfigUtils.inferMissingArguments(functionConfig);
- ClassLoader classLoader = null;
- parallelism = functionConfig.getParallelism();
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- userCodeFile = functionConfig.getJar();
- if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- classLoader = extractClassLoader(userCodeFile);
- } else {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("User jar does not exist");
- }
- classLoader = loadJar(file);
- }
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
- userCodeFile = functionConfig.getGo();
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON){
- userCodeFile = functionConfig.getPy();
- } else {
- throw new UnsupportedOperationException();
- }
- functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader);
- } else if (!StringUtils.isEmpty(sourceConfigString)) {
- SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class);
- inferMissingArguments(sourceConfig);
- String builtInSource = isBuiltInSource(sourceConfig.getArchive());
- if (builtInSource != null) {
- sourceConfig.setArchive(builtInSource);
- }
- parallelism = sourceConfig.getParallelism();
- userCodeFile = sourceConfig.getArchive();
- if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
- } else {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Source archive does not exist");
- }
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
- }
- } else {
- SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class);
- inferMissingArguments(sinkConfig);
- String builtInSink = isBuiltInSource(sinkConfig.getArchive());
- if (builtInSink != null) {
- sinkConfig.setArchive(builtInSink);
- }
- parallelism = sinkConfig.getParallelism();
- userCodeFile = sinkConfig.getArchive();
- if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
- } else {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Sink archive does not exist");
- }
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
- }
- }
- startLocalRun(functionDetails, parallelism,
- instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
- AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
- .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
- .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
- .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
- .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- userCodeFile);
- }
-
- protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
- int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig,
- String userCodeFile)
- throws Exception {
-
- String serviceUrl = DEFAULT_SERVICE_URL;
- if (brokerServiceUrl != null) {
- serviceUrl = brokerServiceUrl;
- }
-
- try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
- serviceUrl,
- stateStorageServiceUrl,
- authConfig,
- null, /* java instance jar file */
- null, /* python instance file */
- null, /* log directory */
- null, /* extra dependencies dir */
- new DefaultSecretsProviderConfigurator(), false)) {
- List<RuntimeSpawner> spawners = new LinkedList<>();
- for (int i = 0; i < parallelism; ++i) {
- InstanceConfig instanceConfig = new InstanceConfig();
- instanceConfig.setFunctionDetails(functionDetails);
- // TODO: correctly implement function version and id
- instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
- instanceConfig.setFunctionId(UUID.randomUUID().toString());
- instanceConfig.setInstanceId(i + instanceIdOffset);
- instanceConfig.setMaxBufferedTuples(1024);
- instanceConfig.setPort(FunctionCommon.findAvailablePort());
- instanceConfig.setClusterName("local");
- RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
- instanceConfig,
- userCodeFile,
- null,
- containerFactory,
- 30000);
- spawners.add(runtimeSpawner);
- runtimeSpawner.start();
- }
- java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- log.info("Shutting down the localrun runtimeSpawner ...");
- for (RuntimeSpawner spawner : spawners) {
- spawner.close();
- }
- }
- });
- Timer statusCheckTimer = new Timer();
- statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
- int index = 0;
- for (RuntimeSpawner spawner : spawners) {
- futures[index] = spawner.getFunctionStatusAsJson(index);
- index++;
- }
- try {
- CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
- for (index = 0; index < futures.length; ++index) {
- String json = futures[index].get();
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- log.info(gson.toJson(new JsonParser().parse(json)));
- }
- } catch (Exception ex) {
- log.error("Could not get status from all local instances");
- }
- }
- }, 30000, 30000);
- java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- statusCheckTimer.cancel();
- }
- });
- for (RuntimeSpawner spawner : spawners) {
- spawner.join();
- log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
- }
-
- }
- }
-
- private String isBuiltInSource(String sourceType) throws IOException {
- // Validate the connector source type from the locally available connectors
- Connectors connectors = getConnectors();
-
- if (connectors.getSources().containsKey(sourceType)) {
- // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSources().get(sourceType).toString();
- } else {
- return null;
- }
- }
-
- private String isBuiltInSink(String sinkType) throws IOException {
- // Validate the connector source type from the locally available connectors
- Connectors connectors = getConnectors();
-
- if (connectors.getSinks().containsKey(sinkType)) {
- // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSinks().get(sinkType).toString();
- } else {
- return null;
- }
- }
-
- private Connectors getConnectors() throws IOException {
- // Validate the connector source type from the locally available connectors
- String pulsarHome = System.getenv("PULSAR_HOME");
- if (pulsarHome == null) {
- pulsarHome = Paths.get("").toAbsolutePath().toString();
- }
- String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
- return ConnectorUtils.searchForConnectors(connectorsDir);
- }
-}