You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2023/03/29 12:10:19 UTC

[GitHub] [pulsar] raymondBourges edited a discussion: "cannot be cast to class" in a pulsar function producing un new message

GitHub user raymondBourges edited a discussion: "cannot be cast to class" in a pulsar function producing un new message

Hi all,

I continue to explore pulsar.

I'm trying to make a function that reroutes a message if a check fails.

Here is how the function is defined:
```
pulsar-admin functions create \
  --jar /path/to/java/Personne/target/veille-stream-data-0.1.0.jar \
  --classname fr.ur.mdm.personne.PulsarFonctionPersonne \
  --tenant public \
  --namespace poc-personne \
  --name personne-erreur-fonction \
  --inputs persistent://public/poc-personne/personne \
  --log-topic persistent://public/poc-personne/log
```

Here is my code:
```java
public class PulsarFonctionPersonne implements Function<Personne, Personne> {

  @Override
  public Personne process(Personne personne, Context context) throws Exception {
    Logger LOG = context.getLogger();
    LOG.warn("TRACE 1 --> " + personne.getNom());
    try {
      new PersonneValidateur().valider(personne);
    }
    catch(VerificateurException e) {
      LOG.warn("TRACE 2 --> " + e.getMessage());
      context.newOutputMessage("public/poc-personne/personne-erreur", AvroSchema.of(Personne.class))
        .value(personne)
        .property("messageErreur", e.getMessage())
        .send();
      LOG.warn("TRACE 3");
    }
    return personne;
  }

}
```

But I get these messages in the log topic:
```
***************
TRACE 1 --> 
***************
TRACE 2 --> 
***************
Starting Pulsar producer perf with config: {"topicName":"public/poc-personne/personne-erreur","producerName":null,"sendTimeoutMs":0,"blockIfQueueFull":true,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"CustomPartition","hashingScheme":"Murmur3_32Hash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":10000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"LZ4","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{"application":"pulsar-function","id":"public/poc-personne/personne-erreur-fonction","instance_hostname":"5ec86a11c2f1","instance_id":"0"},"initialSubscriptionName":null}
***************
Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":true,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustSto
 rePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
***************
[public/poc-personne/personne-erreur] [null] Creating producer on cnx [id: 0x714c5afb, L:/127.0.0.1:54522 - R:localhost/127.0.0.1:6650]
***************
[public/poc-personne/personne-erreur] [null] Failed to create producer: class org.apache.pulsar.client.impl.schema.SchemaInfoImpl cannot be cast to class org.apache.pulsar.client.impl.schema.SchemaInfoImpl (org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed module of loader org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders$ParentFirstClassLoader @2f1ea80d; org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed module of loader java.net.URLClassLoader @2626b418)
***************
Encountered exception when processing message PulsarRecord(topicName=Optional[persistent://public/poc-personne/personne], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@26d4c770], schema=org.apache.pulsar.client.impl.schema.AvroSchema@42be7bf0, failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$277/0x0000000801088c08@5494d099, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$276/0x00000008010889e0@59c0d1ee, customAckFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$275/0x00000008010887a8@21f0525b)
```

I never see "TRACE 3" log or my message in destination topic :-(

I don't understand what I should do to avoid this "cannot be cast" error.

Can you help me ?

THANKS

GitHub link: https://github.com/apache/pulsar/discussions/19962

----
This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscribe@pulsar.apache.org