You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 06:49:49 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #16155: [Pulsar SQL] Fix `messageQueue` release message issue.

Technoboy- opened a new pull request, #16155:
URL: https://github.com/apache/pulsar/pull/16155

   ### Motivation
   When PulsarRecordCursor close, there may occur below error: 
   ```
   2022-06-08T09:33:33.959423259-04:00 2022-06-08T13:33:33.959Z	INFO	20220608_133129_00353_ydmer.2.0-21-52	org.apache.pulsar.sql.presto.PulsarRecordCursor	Init cacheSizeAllocator with NullCacheSizeAllocator.
   2022-06-08T09:33:33.964070819-04:00 2022-06-08T13:33:33.963Z	ERROR	deserialize-thread-split-13	org.apache.pulsar.sql.presto.PulsarRecordCursor	Stop running DeserializeEntries
   2022-06-08T09:33:33.964094767-04:00 java.lang.IllegalArgumentException: newPosition > limit: (48825 > 119)
   2022-06-08T09:33:33.964101767-04:00 	at java.base/java.nio.Buffer.createPositionException(Buffer.java:318)
   2022-06-08T09:33:33.964104898-04:00 	at java.base/java.nio.Buffer.position(Buffer.java:293)
   2022-06-08T09:33:33.964123765-04:00 	at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1094)
   2022-06-08T09:33:33.964126988-04:00 	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:184)
   2022-06-08T09:33:33.964130488-04:00 	at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
   2022-06-08T09:33:33.964133500-04:00 	at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
   2022-06-08T09:33:33.964136667-04:00 	at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
   2022-06-08T09:33:33.964139714-04:00 	at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
   2022-06-08T09:33:33.964142670-04:00 	at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
   2022-06-08T09:33:33.964145457-04:00 	at org.apache.pulsar.common.api.proto.KeyValue.getValue(KeyValue.java:55)
   2022-06-08T09:33:33.964148441-04:00 	at org.apache.pulsar.common.api.proto.KeyValue.copyFrom(KeyValue.java:159)
   2022-06-08T09:33:33.964150976-04:00 	at org.apache.pulsar.common.api.proto.SingleMessageMetadata.copyFrom(SingleMessageMetadata.java:505)
   2022-06-08T09:33:33.964155504-04:00 	at org.apache.pulsar.common.api.raw.RawMessageImpl.get(RawMessageImpl.java:75)
   2022-06-08T09:33:33.964158176-04:00 	at org.apache.pulsar.common.api.raw.MessageParser.receiveIndividualMessagesFromBatch(MessageParser.java:176)
   2022-06-08T09:33:33.964161161-04:00 	at org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:112)
   2022-06-08T09:33:33.964163699-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:295)
   2022-06-08T09:33:33.964166399-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:273)
   2022-06-08T09:33:33.964174886-04:00 	at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:266)
   2022-06-08T09:33:33.964177766-04:00 	at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:239)
   2022-06-08T09:33:33.964180504-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries.run(PulsarRecordCursor.java:273)
   
   2022-06-08T09:33:33.965527031-04:00 2022-06-08T13:33:33.963Z	ERROR	SplitRunner-0-47	io.prestosql.execution.executor.TaskExecutor	Error processing Split 20220608_133129_00352_ydmer.2.0-7 PulsarSplit{splitId=13, connectorId='pulsar', originSchemaName='derived_margin_data', schemaName='qa-ipg/portfolio_finance', tableName='derived_margin_data', splitSize=37, schema='{"type":"record","namespace":"ASC.DerivedMarginData","name":"DerivedMarginDataItem","fields":[{"name":"ASCPermId","type":"int"},{"name":"ASCId","type":["null","string"],"default":null},{"name":"Ubs30AvgDtv","type":["null","double"],"default":null},{"name":"Ubs90Volatility","type":["null","double"],"default":null},{"name":"Jpm20AvgDtv","type":["null","double"],"default":null},{"name":"Jpm20Volatility","type":["null","double"],"default":null},{"name":"Empirical20Volatility","type":["null","double"],"default":null},{"name":"Dbk100AvgDtv","type":["null","double"],"default":null},{"name":"Dbk90AvgDtv","type":["null","double"],
 "default":null},{"name":"Dbk20AvgDtv","type":["null","double"],"default":null},{"name":"Dbk10AvgDtv","type":["null","double"],"default":null},{"name":"Csf90AvgDtv","type":["null","double"],"default":null},{"name":"Csf60AvgDtv","type":["null","double"],"default":null},{"name":"Csf30AvgDtv","type":["null","double"],"default":null}]}', schemaType=JSON, startPositionEntryId=33, endPositionEntryId=70, startPositionLedgerId=3463486, endPositionLedgerId=3463486, schemaInfoProperties={}} (start = 2.296111546565531E9, wall = 147 ms, cpu = 0 ms, wait = 429 ms, calls = 1)
   2022-06-08T09:33:33.965565080-04:00 java.nio.BufferUnderflowException
   2022-06-08T09:33:33.965571091-04:00 	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
   2022-06-08T09:33:33.965577921-04:00 	at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
   2022-06-08T09:33:33.965582062-04:00 	at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
   2022-06-08T09:33:33.965586668-04:00 	at io.netty.buffer.AbstractByteBuf.getBytes(AbstractByteBuf.java:490)
   2022-06-08T09:33:33.965589527-04:00 	at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:537)
   2022-06-08T09:33:33.965592372-04:00 	at org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:156)
   2022-06-08T09:33:33.965595098-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSchemaInfo(PulsarRecordCursor.java:662)
   2022-06-08T09:33:33.965597713-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:530)
   2022-06-08T09:33:33.965600538-04:00 	at io.prestosql.$gen.CursorProcessor_20220608_133326_33.process(Unknown Source)
   2022-06-08T09:33:33.965603289-04:00 	at io.prestosql.operator.ScanFilterAndProjectOperator$RecordCursorToPages.process(ScanFilterAndProjectOperator.java:323)
   2022-06-08T09:33:33.965606919-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965610158-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965612957-04:00 	at io.prestosql.operator.WorkProcessorUtils.access$000(WorkProcessorUtils.java:37)
   2022-06-08T09:33:33.965615585-04:00 	at io.prestosql.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
   2022-06-08T09:33:33.965618223-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965628121-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965631636-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
   2022-06-08T09:33:33.965635773-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965639680-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:277)
   2022-06-08T09:33:33.965643475-04:00 	at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
   2022-06-08T09:33:33.965646669-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965649650-04:00 	at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:306)
   2022-06-08T09:33:33.965652535-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965655504-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965859342-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
   2022-06-08T09:33:33.965867541-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965871003-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965877994-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:215)
   2022-06-08T09:33:33.965882107-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965889844-04:00 	at io.prestosql.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:149)
   ```
   
   It's the same issue with #14379.
   
   Because DeserializeEntries offer entries into `messageQueue` and will release relative entries after processing.
   But when PulsarRecordCursor closes, it will also release entries in the `messageQueue`, so different threads have released the same entry to cause the above issue.
   
   ### Modification
   
   - Release the related queue messages after `deserializeEntries` close.
   
   ### Documentation
   
   
   - [x] `doc-not-needed` 
   (Please explain why)
     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16155: [Pulsar SQL] Fix `messageQueue` release message issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16155:
URL: https://github.com/apache/pulsar/pull/16155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org