You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/24 02:00:17 UTC

[GitHub] [pulsar] dhinesherode91 edited a comment on issue #8429: Cannot parse schema

dhinesherode91 edited a comment on issue #8429:
URL: https://github.com/apache/pulsar/issues/8429#issuecomment-867272525


   Hey,
   
   I am facing a similar issue.
   I am running pulsar 2.7.1 and I have a topic with AVRO schema where my producers push avro messages and consumer receives and process the avro data. My consumer application receives the message through pulsar client's MessageListener. All my topic messages are tagged with the latest avro schema version. And I could see that schema in the Pulsar cluster's SchemaRegistry.
   
   But I am facing the following issue when my consumer application starts and consumes message.
   `org.apache.pulsar.shade.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse <null> schema
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
   	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:86)
   	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:60)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143)
   	at com.kohia.galaxy.listener.MessageListenerImpl.received(MessageListenerImpl.kt:20)
   	at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:882)
   	at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$5(ConsumerBase.java:862)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse <null> schema
   	at org.apache.pulsar.shade.org.apache.avro.Schema.parse(Schema.java:1597)
   	at org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1396)
   	at org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1384)
   	at org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema(SchemaUtil.java:46)
   	at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:53)
   	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
   	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
   	at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
   	... 18 common frames omitted`
   	
   The above exception occurs for all the message that is received by the consumer.
   But when I restart my consumer application, it starts working fine and the above exception is not occurring.
   This issue occurs randomly when I restart my consumer application and when I again restarted it works fine and all the message are processed properly.
   Here is my subcription code,
   
   ```pulsarClient
               .newConsumer(AvroSchema.of(MyMetric::class))
               .topic(topicName).subscriptionName(subscriptionName)
               .subscriptionType(SubscriptionType.Shared)
               .receiverQueueSize(localQueueMsgCount)
               .messageListener(myMessageListener)
               .subscribe()```
               
    Here is my Listener code(short version),
    `import org.apache.pulsar.client.api.Consumer
   import org.apache.pulsar.client.api.Message
   import org.apache.pulsar.client.api.MessageListener
   import java.net.InetAddress
   class MyMessageListener<T>() : MessageListener<T> {
       override fun received(consumer: Consumer<T>, msg: Message<T>) {
           try {
   	    process(msg.value)
               consumer.acknowledge(msg)
           } catch (ex: Exception) {
               consumer.negativeAcknowledge(msg)
           }
       }
       fun process(metric:MyMetric){} 
   }`
   
   When I debug, I could find the following log at the time when my consumer application is working perfectly.
   `Load schema reader for version(2), 
   schema is : {"type":"record","name":"MyMetric","namespace":"com.example","fields":[{"name":"assetId","type":"long"},{"name":"utctime","type":{"type":"string","avro.java.string":"String"}}]}, 
   schemaInfo: {
     "name": "mytenant/mynamesapce/mytopic",
     "schema": {
       "type": "record",
       "name": "MyMetric",
       "namespace": "com.example",
       "fields": [
         {
           "name": "assetId",
           "type": "long"
         },
         {
           "name": "utctime",
           "type": {
             "type": "string",
             "avro.java.string": "String"
           }
         }
       ]
     },
    "type": "AVRO",
    "properties": {
       "__alwaysAllowNull": "true",
       "__jsr310ConversionEnabled": "false"
     }
   }`
   
   But at the time when my consumer application works weird, I could find the following log which doesn't have any schema info,
   `Load schema reader for version(2), 
   schema is : , 
   schemaInfo: {
     "name": "",
     "schema": "",
     "type": "NONE",
     "properties": {}
   }`
   
   Can u guys please help me resolve this issue. We are already in production and we find it diffficult to restart our consumer applications each and every time when this issue occurs.
   
   @robshep @congbobo184 Did you find any root cause for this issue or any way to fix it, if so kindly share so that it may help us.
   Eagerly waiting for your inputs. :) 
   
           


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org