You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/04 11:45:31 UTC

[GitHub] [pulsar] Atiqul-Islam created a discussion: Pulsar Function Window Length Count

GitHub user Atiqul-Islam created a discussion: Pulsar Function Window Length Count

I am trying to write a pulsar function with python that executes every 10 samples of data. 

Below is the sandbox code for the pulsar function
```
from pulsar import Function

class Func(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        return input
```

I am producing some random data in the topic using the following code 
```
import pulsar, json

client = pulsar.Client('pulsar://127.0.0.1:6650')

producer = client.create_producer('persistent://public/default/in')

i = 0

while True:
    producer.send(json.dumps({'num': i}).encode('utf8'))
    i += 1

client.close()
```

I am executing the function using the following bash command

```
/pulsar/bin/pulsar-admin functions localrun --tenant public --namespace default --py /home/Func.zip --classname Func.Func --inputs persistent://public/default/in --output persistent://public/default/out1 --window-length-count 10
```

However, I am getting the following error:
```
root@098678fbe93d://# /pulsar/bin/pulsar-admin functions localrun --tenant public --namespace default --py /home/Func.zip --classname Func.Func --inputs persistent://public/default/in --output persistent://public/default/out1 --window-length-count 10 
2022-10-04T00:05:30,436+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /pulsar/instances/java-instance.jar
2022-10-04T00:05:30,441+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /pulsar/instances/python-instance/python_instance_main.py
2022-10-04T00:05:30,442+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory - No extra dependencies location is defined in either function worker config or system environment
2022-10-04T00:05:30,524+0000 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/Func-0 RuntimeSpawner starting function
2022-10-04T00:05:30,525+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Creating function log directory /pulsar/logs/functions/public/default/Func
2022-10-04T00:05:30,525+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Created or found function log directory /pulsar/logs/functions/public/default/Func
2022-10-04T00:05:30,526+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - ProcessBuilder starting the process with args python /pulsar/instances/python-instance/python_instance_main.py --py /home/Func.zip --logging_directory /pulsar/logs/functions --logging_file Func --logging_config_file /pulsar/conf/functions-logging/logging_config.ini --instance_id 0 --function_id b4b248ce-ebb2-4c1b-8291-3147804f4491 --function_version 7531c27f-fbe1-49f5-971e-b08a4e063ec3 --function_details '{"tenant":"public","namespace":"default","name":"Func","className":"org.apache.pulsar.functions.windowing.WindowFunctionExecutor","userConfig":"{\"__WINDOWCONFIGS__\":{\"windowLengthCount\":10,\"slidingIntervalCount\":10,\"actualWindowFunctionClassName\":\"Func.Func\"}}","runtime":"PYTHON","parallelism":1,"source":{"inputSpecs":{"persistent://public/default/in":{}},"cleanupSubscription":true},"sink":{"topic":"persistent://public/default/out1","forwardSourceMessageProperty":true},"r
 esources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}' --pulsar_serviceurl pulsar://localhost:6650 --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 41073 --metrics_port 43675 --expected_healthcheck_interval 30 --secrets_provider secretsprovider.ClearTextSecretsProvider --cluster_name local
2022-10-04T00:05:30,546+0000 [main] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Started process successfully
2022-10-04 00:05:30.748 INFO  [140305123407680] Client:88 | Subscribing on Topic :persistent://public/default/in
2022-10-04 00:05:30.748 INFO  [140305060529920] ExecutorService:41 | Run io_service in a single thread
2022-10-04 00:05:30.748 INFO  [140305123407680] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-10-04 00:05:30.748 INFO  [140305123407680] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-10-04 00:05:30.749 INFO  [140305060529920] ClientConnection:375 | [127.0.0.1:48066 -> 127.0.0.1:6650] Connected to broker
2022-10-04 00:05:30.757 INFO  [140305060529920] HandlerBase:64 | [persistent://public/default/in, public/default/Func, 0] Getting connection from pool
2022-10-04 00:05:30.757 INFO  [140304837506816] ExecutorService:41 | Run io_service in a single thread
2022-10-04 00:05:30.760 INFO  [140305060529920] ConsumerImpl:224 | [persistent://public/default/in, public/default/Func, 0] Created consumer on broker [127.0.0.1:48066 -> 127.0.0.1:6650] 
2022-10-04T00:06:01,082+0000 [function-timer-thread-1-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check failed for Func-0
java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:184) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.1.jar:2.10.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
        at io.grpc.Status.asRuntimeException(Status.java:535) ~[io.grpc-grpc-api-1.45.1.jar:1.45.1]
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533) ~[io.grpc-grpc-stub-1.45.1.jar:1.45.1]
        at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[io.grpc-grpc-core-1.45.1.jar:1.45.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        ... 1 more
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:41073
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
        at io.grpc.netty.shaded.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:320) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.45.1.jar:1.45.1]
        ... 1 more
2022-10-04T00:06:01,103+0000 [function-timer-thread-1-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Extracted Process death exception
java.lang.RuntimeException: 
        at org.apache.pulsar.functions.runtime.process.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:400) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.functions.runtime.process.ProcessRuntime.isAlive(ProcessRuntime.java:387) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.1.jar:2.10.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
2022-10-04T00:06:01,104+0000 [function-timer-thread-1-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/Func Function Container is dead with following exception. Restarting.
java.lang.RuntimeException: 
        at org.apache.pulsar.functions.runtime.process.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:400) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.functions.runtime.process.ProcessRuntime.isAlive(ProcessRuntime.java:387) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.1.jar:2.10.1]
        at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.1.jar:2.10.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
2022-10-04T00:06:01,108+0000 [Timer-0] INFO  org.apache.pulsar.functions.LocalRunner - {
  "failureException": "UNAVAILABLE: io exception",
  "instanceId": "0"
}
```

However, the functions works fine when i remove the `window-length-count` flag from the command.

GitHub link: https://github.com/apache/pulsar/discussions/17925

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org