You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:22 UTC

[pulsar] 36/38: Support function with format: Function> (#6684)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 55d5430701d41d92ce290d838e332eb9d9154b9e
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri May 8 11:04:09 2020 +0800

    Support function with format: Function<I, CompletableFuture<O>> (#6684)
    
    Fixes #6519
    
    ### Motivation
    
    Currently, Pulsar Functions not support Async mode, e.g. user passed in a Function in format :
    ```
    Function<I, CompletableFuture<O>>
    ```
    This kind of function is useful if the function might use RPCs to call external systems.
    
    e.g.
    ```java
    public class AsyncFunction implements Function<String, CompletableFuture<O>> {
        CompletableFuture<O> apply (String input) {
            CompletableFuture future = new CompletableFuture();
            ...function compute...
            future.whenComplete(() -> {
                ...  call external system  ...
            });
            return future;
        }
    ```
    
    ### Modifications
    - add support for Async Functions support.
    
    ### Verifying this change
    current ut passed.
    
    * support func: Function<I, CompletableFuture<O>>
    
    * add 2 examples
    
    * add limit to the max outstanding items
    (cherry picked from commit 7cd28b9d5cc862fe239f5e8fa1ca616237764607)
---
 .../pulsar/common/functions/FunctionConfig.java    |  3 +
 .../pulsar/functions/instance/InstanceConfig.java  |  3 +
 .../pulsar/functions/instance/JavaInstance.java    | 64 +++++++++++++--
 .../functions/instance/JavaInstanceRunnable.java   | 41 ++++-----
 .../functions/instance/JavaInstanceTest.java       | 96 ++++++++++++++++++++--
 .../api/examples/AsyncContextFunction.java         | 59 +++++++++++++
 .../JavaNativeAsyncExclamationFunction.java        | 41 +++++++++
 .../org/apache/pulsar/functions/LocalRunner.java   |  2 +
 .../functions/runtime/JavaInstanceStarter.java     |  4 +
 .../pulsar/functions/worker/WorkerConfig.java      |  6 ++
 .../pulsar/functions/worker/FunctionActioner.java  |  1 +
 11 files changed, 289 insertions(+), 31 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 5ae0621..13123bc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -120,4 +120,7 @@ public class FunctionConfig {
     // to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
     // interface
     private String customRuntimeOptions;
+    // Max pending async requests per instance to avoid large number of concurrent requests.
+    // Only used in AsyncFunction. Default: 1000.
+    private Integer maxPendingAsyncRequests = 1000;
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index f823728..86f57eda 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -44,6 +44,9 @@ public class InstanceConfig {
     private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
     private int port;
     private String clusterName;
+    // Max pending async requests per instance to avoid large number of concurrent requests.
+    // Only used in AsyncFunction. Default: 1000
+    private int maxPendingAsyncRequests = 1000;
 
     /**
      * Get the string representation of {@link #getInstanceId()}.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 8aee702..1e18a07 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -40,9 +45,18 @@ public class JavaInstance implements AutoCloseable {
     private Function function;
     private java.util.function.Function javaUtilFunction;
 
-    public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
+    // for Async function max out standing items
+    private final InstanceConfig instanceConfig;
+    private final Executor executor;
+    @Getter
+    private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;
+
+    public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {
 
         this.context = contextImpl;
+        this.instanceConfig = instanceConfig;
+        this.executor = Executors.newSingleThreadExecutor();
+        this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
 
         // create the functions
         if (userClassObject instanceof Function) {
@@ -52,23 +66,63 @@ public class JavaInstance implements AutoCloseable {
         }
     }
 
-    public JavaExecutionResult handleMessage(Record<?> record, Object input) {
+    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
         if (context != null) {
             context.setCurrentMessageContext(record);
         }
+
+        final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
         JavaExecutionResult executionResult = new JavaExecutionResult();
+
+        final Object output;
+
         try {
-            Object output;
             if (function != null) {
                 output = function.process(input, context);
             } else {
                 output = javaUtilFunction.apply(input);
             }
-            executionResult.setResult(output);
         } catch (Exception ex) {
             executionResult.setUserException(ex);
+            future.complete(executionResult);
+            return future;
+        }
+
+        if (output instanceof CompletableFuture) {
+            // Function is in format: Function<I, CompletableFuture<O>>
+            try {
+                pendingAsyncRequests.put((CompletableFuture) output);
+            } catch (InterruptedException ie) {
+                log.warn("Exception while put Async requests", ie);
+                executionResult.setUserException(ie);
+                future.complete(executionResult);
+                return future;
+            }
+
+            ((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Got result async: object: {}, throwable: {}", obj, throwable);
+                }
+
+                if (throwable != null) {
+                    executionResult.setUserException(new Exception((Throwable)throwable));
+                    pendingAsyncRequests.remove(output);
+                    future.complete(executionResult);
+                    return;
+                }
+                executionResult.setResult(obj);
+                pendingAsyncRequests.remove(output);
+                future.complete(executionResult);
+            }, executor);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Got result: object: {}", output);
+            }
+            executionResult.setResult(output);
+            future.complete(executionResult);
         }
-        return executionResult;
+
+        return future;
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 2db60c7..b983991 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.CompletableFuture;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -216,7 +217,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(contextImpl, object);
+        return new JavaInstance(contextImpl, object, instanceConfig);
     }
 
     ContextImpl setupContext() {
@@ -256,7 +257,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 }
 
                 addLogTopicHandler();
-                JavaExecutionResult result;
+                CompletableFuture<JavaExecutionResult> result;
 
                 // set last invocation time
                 stats.setLastInvocation(System.currentTimeMillis());
@@ -274,10 +275,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
                 removeLogTopicHandler();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Got result: {}", result.getResult());
-                }
-
                 try {
                     processResult(currentRecord, result);
                 } catch (Exception e) {
@@ -417,23 +414,27 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     private void processResult(Record srcRecord,
-                               JavaExecutionResult result) throws Exception {
-        if (result.getUserException() != null) {
-            log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException());
-            stats.incrUserExceptions(result.getUserException());
-            srcRecord.fail();
-        } else {
-            if (result.getResult() != null) {
-                sendOutputMessage(srcRecord, result.getResult());
+                               CompletableFuture<JavaExecutionResult> result) throws Exception {
+        result.whenComplete((result1, throwable) -> {
+            if (throwable != null || result1.getUserException() != null) {
+                Throwable t = throwable != null ? throwable : result1.getUserException();
+                log.warn("Encountered exception when processing message {}",
+                        srcRecord, t);
+                stats.incrUserExceptions(t);
+                srcRecord.fail();
             } else {
-                if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                    // the function doesn't produce any result or the user doesn't want the result.
-                    srcRecord.ack();
+                if (result1.getResult() != null) {
+                    sendOutputMessage(srcRecord, result1.getResult());
+                } else {
+                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                        // the function doesn't produce any result or the user doesn't want the result.
+                        srcRecord.ack();
+                    }
                 }
+                // increment total successfully processed
+                stats.incrTotalProcessedSuccessfully();
             }
-            // increment total successfully processed
-            stats.incrTotalProcessedSuccessfully();
-        }
+        });
     }
 
     private void sendOutputMessage(Record srcRecord, Object output) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 0cb361d..5061d1e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -22,10 +22,14 @@ import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class JavaInstanceTest {
 
     /**
@@ -33,14 +37,94 @@ public class JavaInstanceTest {
      * @throws Exception
      */
     @Test
-    public void testLambda() {
+    public void testLambda() throws Exception {
         JavaInstance instance = new JavaInstance(
-            mock(ContextImpl.class),
-            (Function<String, String>) (input, context) -> input + "-lambda");
+                mock(ContextImpl.class),
+                (Function<String, String>) (input, context) -> input + "-lambda",
+                new InstanceConfig());
         String testString = "ABC123";
-        JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
-        assertNotNull(result.getResult());
-        assertEquals(new String(testString + "-lambda"), result.getResult());
+        CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+        assertNotNull(result.get().getResult());
+        assertEquals(new String(testString + "-lambda"), result.get().getResult());
+        instance.close();
+    }
+
+    @Test
+    public void testAsyncFunction() throws Exception {
+        InstanceConfig instanceConfig = new InstanceConfig();
+
+        Function<String, CompletableFuture<String>> function = (input, context) -> {
+            log.info("input string: {}", input);
+            CompletableFuture<String> result  = new CompletableFuture<>();
+            Executors.newCachedThreadPool().submit(() -> {
+                try {
+                    Thread.sleep(500);
+                    result.complete(String.format("%s-lambda", input));
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            });
+
+            return result;
+        };
+
+        JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                function,
+                instanceConfig);
+        String testString = "ABC123";
+        CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+        assertNotNull(result.get().getResult());
+        assertEquals(new String(testString + "-lambda"), result.get().getResult());
+        instance.close();
+    }
+
+    @Test
+    public void testAsyncFunctionMaxPending() throws Exception {
+        InstanceConfig instanceConfig = new InstanceConfig();
+        int pendingQueueSize = 2;
+        instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
+
+        Function<String, CompletableFuture<String>> function = (input, context) -> {
+            log.info("input string: {}", input);
+            CompletableFuture<String> result  = new CompletableFuture<>();
+            Executors.newCachedThreadPool().submit(() -> {
+                try {
+                    Thread.sleep(500);
+                    result.complete(String.format("%s-lambda", input));
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            });
+
+            return result;
+        };
+
+        JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                function,
+                instanceConfig);
+        String testString = "ABC123";
+
+        long startTime = System.currentTimeMillis();
+        assertEquals(pendingQueueSize, instance.getPendingAsyncRequests().remainingCapacity());
+        CompletableFuture<JavaExecutionResult> result1 = instance.handleMessage(mock(Record.class), testString);
+        assertEquals(pendingQueueSize - 1, instance.getPendingAsyncRequests().remainingCapacity());
+        CompletableFuture<JavaExecutionResult> result2 = instance.handleMessage(mock(Record.class), testString);
+        assertEquals(pendingQueueSize - 2, instance.getPendingAsyncRequests().remainingCapacity());
+        CompletableFuture<JavaExecutionResult> result3 = instance.handleMessage(mock(Record.class), testString);
+        // no space left
+        assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity());
+
+        instance.getPendingAsyncRequests().remainingCapacity();
+        assertNotNull(result1.get().getResult());
+        assertNotNull(result2.get().getResult());
+        assertNotNull(result3.get().getResult());
+
+        assertEquals(new String(testString + "-lambda"), result1.get().getResult());
+        long endTime = System.currentTimeMillis();
+
+        log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
         instance.close();
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java
new file mode 100644
index 0000000..b70bc7c
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+public class AsyncContextFunction implements Function<String, CompletableFuture<Void>> {
+    @Override
+    public CompletableFuture<Void> process(String input, Context context) {
+        Logger LOG = context.getLogger();
+        CompletableFuture<Void> future = new CompletableFuture();
+
+        // this method only delay a function execute.
+        Executors.newCachedThreadPool().submit(() -> {
+            try {
+                Thread.sleep(500);
+            } catch (Exception e) {
+                LOG.error("Exception when Thread.sleep", e);
+                future.completeExceptionally(e);
+            }
+
+            String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
+            String funcName = context.getFunctionName();
+
+            String logMessage = String
+                    .format("A message with value of \"%s\" has arrived on one of the following topics: %s\n",
+                            input, inputTopics);
+            LOG.info(logMessage);
+
+            String metricName = String.format("function-%s-messages-received", funcName);
+            context.recordMetric(metricName, 1);
+
+            future.complete(null);
+        });
+
+        return future;
+    }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java
new file mode 100644
index 0000000..7cad46b
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+
+public class JavaNativeAsyncExclamationFunction implements Function<String, CompletableFuture<String>> {
+    @Override
+    public CompletableFuture<String> apply(String input) {
+        CompletableFuture<String> future = new CompletableFuture();
+
+        Executors.newCachedThreadPool().submit(() -> {
+            try {
+                Thread.sleep(500);
+                future.complete(String.format("%s-!!", input));
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        });
+
+        return future;
+    }
+}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 244a757..a606d3b 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -358,6 +358,7 @@ public class LocalRunner {
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(FunctionCommon.findAvailablePort());
                 instanceConfig.setClusterName("local");
+                instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
@@ -417,6 +418,7 @@ public class LocalRunner {
             instanceConfig.setMaxBufferedTuples(1024);
             instanceConfig.setPort(FunctionCommon.findAvailablePort());
             instanceConfig.setClusterName("local");
+            instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
             RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                     instanceConfig,
                     userCodeFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index ec2e36a..970047f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -125,6 +125,9 @@ public class JavaInstanceStarter implements AutoCloseable {
     @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
     public String clusterName;
 
+    @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false)
+    public int maxPendingAsyncRequests = 1000;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -147,6 +150,7 @@ public class JavaInstanceStarter implements AutoCloseable {
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
         instanceConfig.setClusterName(clusterName);
+        instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
         Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
         if (functionDetailsJsonString.charAt(0) == '\'') {
             functionDetailsJsonString = functionDetailsJsonString.substring(1);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 01dff75..6d4619f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -404,6 +404,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private Map<String, Object> runtimeCustomizerConfig = Collections.emptyMap();
 
+    @FieldContext(
+            doc = "Max pending async requests per instance to avoid large number of concurrent requests."
+                  + "Only used in AsyncFunction. Default: 1000"
+    )
+    private int maxPendingAsyncRequests = 1000;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index c1b4971..36d9f13 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -185,6 +185,7 @@ public class FunctionActioner {
         instanceConfig.setPort(FunctionCommon.findAvailablePort());
         instanceConfig.setClusterName(clusterName);
         instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
+        instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests());
         return instanceConfig;
     }