You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 06:49:49 UTC
[GitHub] [pulsar] Technoboy- opened a new pull request, #16155: [Pulsar SQL] Fix `messageQueue` release message issue.
Technoboy- opened a new pull request, #16155:
URL: https://github.com/apache/pulsar/pull/16155
### Motivation
When PulsarRecordCursor close, there may occur below error:
```
2022-06-08T09:33:33.959423259-04:00 2022-06-08T13:33:33.959Z INFO 20220608_133129_00353_ydmer.2.0-21-52 org.apache.pulsar.sql.presto.PulsarRecordCursor Init cacheSizeAllocator with NullCacheSizeAllocator.
2022-06-08T09:33:33.964070819-04:00 2022-06-08T13:33:33.963Z ERROR deserialize-thread-split-13 org.apache.pulsar.sql.presto.PulsarRecordCursor Stop running DeserializeEntries
2022-06-08T09:33:33.964094767-04:00 java.lang.IllegalArgumentException: newPosition > limit: (48825 > 119)
2022-06-08T09:33:33.964101767-04:00 at java.base/java.nio.Buffer.createPositionException(Buffer.java:318)
2022-06-08T09:33:33.964104898-04:00 at java.base/java.nio.Buffer.position(Buffer.java:293)
2022-06-08T09:33:33.964123765-04:00 at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1094)
2022-06-08T09:33:33.964126988-04:00 at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:184)
2022-06-08T09:33:33.964130488-04:00 at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.964133500-04:00 at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.964136667-04:00 at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
2022-06-08T09:33:33.964139714-04:00 at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
2022-06-08T09:33:33.964142670-04:00 at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
2022-06-08T09:33:33.964145457-04:00 at org.apache.pulsar.common.api.proto.KeyValue.getValue(KeyValue.java:55)
2022-06-08T09:33:33.964148441-04:00 at org.apache.pulsar.common.api.proto.KeyValue.copyFrom(KeyValue.java:159)
2022-06-08T09:33:33.964150976-04:00 at org.apache.pulsar.common.api.proto.SingleMessageMetadata.copyFrom(SingleMessageMetadata.java:505)
2022-06-08T09:33:33.964155504-04:00 at org.apache.pulsar.common.api.raw.RawMessageImpl.get(RawMessageImpl.java:75)
2022-06-08T09:33:33.964158176-04:00 at org.apache.pulsar.common.api.raw.MessageParser.receiveIndividualMessagesFromBatch(MessageParser.java:176)
2022-06-08T09:33:33.964161161-04:00 at org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:112)
2022-06-08T09:33:33.964163699-04:00 at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:295)
2022-06-08T09:33:33.964166399-04:00 at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:273)
2022-06-08T09:33:33.964174886-04:00 at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:266)
2022-06-08T09:33:33.964177766-04:00 at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:239)
2022-06-08T09:33:33.964180504-04:00 at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries.run(PulsarRecordCursor.java:273)
2022-06-08T09:33:33.965527031-04:00 2022-06-08T13:33:33.963Z ERROR SplitRunner-0-47 io.prestosql.execution.executor.TaskExecutor Error processing Split 20220608_133129_00352_ydmer.2.0-7 PulsarSplit{splitId=13, connectorId='pulsar', originSchemaName='derived_margin_data', schemaName='qa-ipg/portfolio_finance', tableName='derived_margin_data', splitSize=37, schema='{"type":"record","namespace":"ASC.DerivedMarginData","name":"DerivedMarginDataItem","fields":[{"name":"ASCPermId","type":"int"},{"name":"ASCId","type":["null","string"],"default":null},{"name":"Ubs30AvgDtv","type":["null","double"],"default":null},{"name":"Ubs90Volatility","type":["null","double"],"default":null},{"name":"Jpm20AvgDtv","type":["null","double"],"default":null},{"name":"Jpm20Volatility","type":["null","double"],"default":null},{"name":"Empirical20Volatility","type":["null","double"],"default":null},{"name":"Dbk100AvgDtv","type":["null","double"],"default":null},{"name":"Dbk90AvgDtv","type":["null","double"],
"default":null},{"name":"Dbk20AvgDtv","type":["null","double"],"default":null},{"name":"Dbk10AvgDtv","type":["null","double"],"default":null},{"name":"Csf90AvgDtv","type":["null","double"],"default":null},{"name":"Csf60AvgDtv","type":["null","double"],"default":null},{"name":"Csf30AvgDtv","type":["null","double"],"default":null}]}', schemaType=JSON, startPositionEntryId=33, endPositionEntryId=70, startPositionLedgerId=3463486, endPositionLedgerId=3463486, schemaInfoProperties={}} (start = 2.296111546565531E9, wall = 147 ms, cpu = 0 ms, wait = 429 ms, calls = 1)
2022-06-08T09:33:33.965565080-04:00 java.nio.BufferUnderflowException
2022-06-08T09:33:33.965571091-04:00 at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
2022-06-08T09:33:33.965577921-04:00 at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.965582062-04:00 at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.965586668-04:00 at io.netty.buffer.AbstractByteBuf.getBytes(AbstractByteBuf.java:490)
2022-06-08T09:33:33.965589527-04:00 at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:537)
2022-06-08T09:33:33.965592372-04:00 at org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:156)
2022-06-08T09:33:33.965595098-04:00 at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSchemaInfo(PulsarRecordCursor.java:662)
2022-06-08T09:33:33.965597713-04:00 at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:530)
2022-06-08T09:33:33.965600538-04:00 at io.prestosql.$gen.CursorProcessor_20220608_133326_33.process(Unknown Source)
2022-06-08T09:33:33.965603289-04:00 at io.prestosql.operator.ScanFilterAndProjectOperator$RecordCursorToPages.process(ScanFilterAndProjectOperator.java:323)
2022-06-08T09:33:33.965606919-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965610158-04:00 at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965612957-04:00 at io.prestosql.operator.WorkProcessorUtils.access$000(WorkProcessorUtils.java:37)
2022-06-08T09:33:33.965615585-04:00 at io.prestosql.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
2022-06-08T09:33:33.965618223-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965628121-04:00 at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965631636-04:00 at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965635773-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965639680-04:00 at io.prestosql.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:277)
2022-06-08T09:33:33.965643475-04:00 at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
2022-06-08T09:33:33.965646669-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965649650-04:00 at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:306)
2022-06-08T09:33:33.965652535-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965655504-04:00 at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965859342-04:00 at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965867541-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965871003-04:00 at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965877994-04:00 at io.prestosql.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:215)
2022-06-08T09:33:33.965882107-04:00 at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965889844-04:00 at io.prestosql.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:149)
```
It's the same issue with #14379.
Because DeserializeEntries offer entries into `messageQueue` and will release relative entries after processing.
But when PulsarRecordCursor closes, it will also release entries in the `messageQueue`, so different threads have released the same entry to cause the above issue.
### Modification
- Release the related queue messages after `deserializeEntries` close.
### Documentation
- [x] `doc-not-needed`
(Please explain why)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- merged pull request #16155: [Pulsar SQL] Fix `messageQueue` release message issue.
Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16155:
URL: https://github.com/apache/pulsar/pull/16155
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org