You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/12 17:18:34 UTC

[pulsar] branch master updated: Improve function state workflow with timeouts (#8528)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebfbf5b  Improve function state workflow with timeouts (#8528)
ebfbf5b is described below

commit ebfbf5bf8a77f14c1192132889b01ba14ba2fe76
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 12 09:18:06 2020 -0800

    Improve function state workflow with timeouts (#8528)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   | 65 ++++++++++++----------
 1 file changed, 37 insertions(+), 28 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6b27868..fd4697c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,20 +19,20 @@
 
 package org.apache.pulsar.functions.instance;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
@@ -57,9 +57,9 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.CryptoConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
@@ -73,7 +73,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
 import org.apache.pulsar.functions.utils.CryptoUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -84,14 +83,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import java.util.concurrent.TimeoutException;
 
 /**
  * A function container implemented using java thread.
@@ -250,7 +249,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     public void run() {
         try {
             setup();
-            
+
             while (true) {
                 currentRecord = readInput();
 
@@ -335,29 +334,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         return fnClassLoader;
     }
 
-    private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
-    	try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
-             StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(),
-             ClientResources.create().scheduler())){
+    private void createStateTableIfNotExist(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
+        try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
+          settings,
+          ClientResources.create().scheduler())) {
             StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
-                .setInitialNumRanges(4)
-                .setMinNumRanges(4)
-                .setStorageType(StorageType.TABLE)
-                .build();
+              .setInitialNumRanges(4)
+              .setMinNumRanges(4)
+              .setStorageType(StorageType.TABLE)
+              .build();
             Stopwatch elapsedWatch = Stopwatch.createStarted();
-            while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
+            Exception lastException = null;
+            while (true) {
                 try {
-                    result(storageAdminClient.getStream(tableNs, tableName));
+                    result(storageAdminClient.getStream(tableNs, tableName), 30, TimeUnit.SECONDS);
                     return;
+                } catch (TimeoutException e){
+                    lastException = e;
                 } catch (NamespaceNotFoundException nnfe) {
                     try {
                         result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
-                            .setDefaultStreamConf(streamConf)
-                            .build()));
+                          .setDefaultStreamConf(streamConf)
+                          .build()));
                         result(storageAdminClient.createStream(tableNs, tableName, streamConf));
                     } catch (Exception e) {
                         // there might be two clients conflicting at creating table, so let's retrieve the table again
                         // to make sure the table is created.
+                        lastException = e;
                     }
                 } catch (StreamNotFoundException snfe) {
                     try {
@@ -365,12 +368,20 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     } catch (Exception e) {
                         // there might be two client conflicting at creating table, so let's retrieve it to make
                         // sure the table is created.
+                        lastException = e;
                     }
                 } catch (ClientException ce) {
+                    lastException = ce;
                     log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
-                        ce.getMessage());
+                      ce.getMessage());
                     TimeUnit.MILLISECONDS.sleep(100);
                 }
+                if (elapsedWatch.elapsed(TimeUnit.MINUTES) > 1) {
+                    if (lastException != null) {
+                        throw new RuntimeException("Failed to get or create state table within timeout", lastException);
+                    }
+                    throw new RuntimeException("Failed to get or create state table within timeout");
+                }
             }
         }
     }
@@ -399,13 +410,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 .build();
 
         // we defer creation of the state table until a java instance is running here.
-        createStateTable(tableNs, tableName, settings);
+        createStateTableIfNotExist(tableNs, tableName, settings);
 
         log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName());
-        this.storageClient = StorageClientBuilder.newBuilder()
-                .withSettings(settings)
-                .withNamespace(tableNs)
-                .build();
+        this.storageClient = new SimpleStorageClientImpl(tableNs, settings);
+
         // NOTE: this is a workaround until we bump bk version to 4.9.0
         // table might just be created above, so it might not be ready for serving traffic
         Stopwatch openSw = Stopwatch.createStarted();