You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/12 17:18:34 UTC
[pulsar] branch master updated: Improve function state workflow
with timeouts (#8528)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebfbf5b Improve function state workflow with timeouts (#8528)
ebfbf5b is described below
commit ebfbf5bf8a77f14c1192132889b01ba14ba2fe76
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 12 09:18:06 2020 -0800
Improve function state workflow with timeouts (#8528)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../functions/instance/JavaInstanceRunnable.java | 65 ++++++++++++----------
1 file changed, 37 insertions(+), 28 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6b27868..fd4697c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,20 +19,20 @@
package org.apache.pulsar.functions.instance;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
@@ -57,9 +57,9 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
@@ -73,7 +73,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -84,14 +83,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import java.util.concurrent.TimeoutException;
/**
* A function container implemented using java thread.
@@ -250,7 +249,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
public void run() {
try {
setup();
-
+
while (true) {
currentRecord = readInput();
@@ -335,29 +334,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return fnClassLoader;
}
- private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
- try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
- StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(),
- ClientResources.create().scheduler())){
+ private void createStateTableIfNotExist(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
+ try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
+ settings,
+ ClientResources.create().scheduler())) {
StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
- .setInitialNumRanges(4)
- .setMinNumRanges(4)
- .setStorageType(StorageType.TABLE)
- .build();
+ .setInitialNumRanges(4)
+ .setMinNumRanges(4)
+ .setStorageType(StorageType.TABLE)
+ .build();
Stopwatch elapsedWatch = Stopwatch.createStarted();
- while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
+ Exception lastException = null;
+ while (true) {
try {
- result(storageAdminClient.getStream(tableNs, tableName));
+ result(storageAdminClient.getStream(tableNs, tableName), 30, TimeUnit.SECONDS);
return;
+ } catch (TimeoutException e){
+ lastException = e;
} catch (NamespaceNotFoundException nnfe) {
try {
result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
- .setDefaultStreamConf(streamConf)
- .build()));
+ .setDefaultStreamConf(streamConf)
+ .build()));
result(storageAdminClient.createStream(tableNs, tableName, streamConf));
} catch (Exception e) {
// there might be two clients conflicting at creating table, so let's retrieve the table again
// to make sure the table is created.
+ lastException = e;
}
} catch (StreamNotFoundException snfe) {
try {
@@ -365,12 +368,20 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} catch (Exception e) {
// there might be two client conflicting at creating table, so let's retrieve it to make
// sure the table is created.
+ lastException = e;
}
} catch (ClientException ce) {
+ lastException = ce;
log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
- ce.getMessage());
+ ce.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
+ if (elapsedWatch.elapsed(TimeUnit.MINUTES) > 1) {
+ if (lastException != null) {
+ throw new RuntimeException("Failed to get or create state table within timeout", lastException);
+ }
+ throw new RuntimeException("Failed to get or create state table within timeout");
+ }
}
}
}
@@ -399,13 +410,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
.build();
// we defer creation of the state table until a java instance is running here.
- createStateTable(tableNs, tableName, settings);
+ createStateTableIfNotExist(tableNs, tableName, settings);
log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName());
- this.storageClient = StorageClientBuilder.newBuilder()
- .withSettings(settings)
- .withNamespace(tableNs)
- .build();
+ this.storageClient = new SimpleStorageClientImpl(tableNs, settings);
+
// NOTE: this is a workaround until we bump bk version to 4.9.0
// table might just be created above, so it might not be ready for serving traffic
Stopwatch openSw = Stopwatch.createStarted();