You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2023/03/29 12:10:19 UTC
[GitHub] [pulsar] raymondBourges edited a discussion: "cannot be cast to class" in a pulsar function producing un new message
GitHub user raymondBourges edited a discussion: "cannot be cast to class" in a pulsar function producing un new message
Hi all,
I continue to explore pulsar.
I'm trying to make a function that reroutes a message if a check fails.
Here is how the function is defined:
```
pulsar-admin functions create \
--jar /path/to/java/Personne/target/veille-stream-data-0.1.0.jar \
--classname fr.ur.mdm.personne.PulsarFonctionPersonne \
--tenant public \
--namespace poc-personne \
--name personne-erreur-fonction \
--inputs persistent://public/poc-personne/personne \
--log-topic persistent://public/poc-personne/log
```
Here is my code:
```java
public class PulsarFonctionPersonne implements Function<Personne, Personne> {
@Override
public Personne process(Personne personne, Context context) throws Exception {
Logger LOG = context.getLogger();
LOG.warn("TRACE 1 --> " + personne.getNom());
try {
new PersonneValidateur().valider(personne);
}
catch(VerificateurException e) {
LOG.warn("TRACE 2 --> " + e.getMessage());
context.newOutputMessage("public/poc-personne/personne-erreur", AvroSchema.of(Personne.class))
.value(personne)
.property("messageErreur", e.getMessage())
.send();
LOG.warn("TRACE 3");
}
return personne;
}
}
```
But I get these messages in the log topic:
```
***************
TRACE 1 -->
***************
TRACE 2 -->
***************
Starting Pulsar producer perf with config: {"topicName":"public/poc-personne/personne-erreur","producerName":null,"sendTimeoutMs":0,"blockIfQueueFull":true,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"CustomPartition","hashingScheme":"Murmur3_32Hash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":10000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"LZ4","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{"application":"pulsar-function","id":"public/poc-personne/personne-erreur-fonction","instance_hostname":"5ec86a11c2f1","instance_id":"0"},"initialSubscriptionName":null}
***************
Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":true,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustSto
rePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
***************
[public/poc-personne/personne-erreur] [null] Creating producer on cnx [id: 0x714c5afb, L:/127.0.0.1:54522 - R:localhost/127.0.0.1:6650]
***************
[public/poc-personne/personne-erreur] [null] Failed to create producer: class org.apache.pulsar.client.impl.schema.SchemaInfoImpl cannot be cast to class org.apache.pulsar.client.impl.schema.SchemaInfoImpl (org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed module of loader org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders$ParentFirstClassLoader @2f1ea80d; org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed module of loader java.net.URLClassLoader @2626b418)
***************
Encountered exception when processing message PulsarRecord(topicName=Optional[persistent://public/poc-personne/personne], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@26d4c770], schema=org.apache.pulsar.client.impl.schema.AvroSchema@42be7bf0, failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$277/0x0000000801088c08@5494d099, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$276/0x00000008010889e0@59c0d1ee, customAckFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$275/0x00000008010887a8@21f0525b)
```
I never see "TRACE 3" log or my message in destination topic :-(
I don't understand what I should do to avoid this "cannot be cast" error.
Can you help me ?
THANKS
GitHub link: https://github.com/apache/pulsar/discussions/19962
----
This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscribe@pulsar.apache.org