You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/20 17:28:36 UTC
[GitHub] [pinot] aleksdikanski edited a comment on issue #7270: Fix the issue with "pinot-pulsar" module (potentially library conflicts)
aleksdikanski edited a comment on issue #7270:
URL: https://github.com/apache/pinot/issues/7270#issuecomment-998128028
Sure @KKcorps , I basically followed the guides on the pulsar and pinot websites:
- set up a pulsar standalone cluster using https://pulsar.apache.org/docs/en/standalone-docker/#start-pulsar-in-docker
**Please note** that I ran the pulsar container in the same network as the pinot cluster for this demo setup and that I gave the pulsar container a name (basically add a `--network <pinot-network> --name pulsar` to the docker run command)
- set up a pinot cluster using the manual setup https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#manual-cluster
- add a pulsar topic (it didn't matter if it was partitioned or not):
```
$ docker run -it \
--rm \
--network <pinot-network> \
-p 127.0.0.1:6650:6650 \
-p 127.0.0.1:8080:8080 \
apachepulsar/pulsar:2.8.1 \
bin/pulsar-admin create persistent://public/default/pinot
```
- add a table and schema to pinot using the following table and schema declaration
table.json
```
{
"tableName": "airlineStats",
"tableType": "REALTIME",
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"segmentsConfig": {
"schemaName": "airlineStats",
"timeColumnName": "DaysSinceEpoch",
"replication": "1",
"replicasPerPartition": "1",
"timeType": "DAYS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.topic.name": "pinot",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"realtime.segment.flush.threshold.size": "10000",
"realtime.segment.flush.threshold.time": "1h"
}
},
"metadata": {}
}
```
schema.json
```
{
"metricFieldSpecs": [
],
"dimensionFieldSpecs": [
{
"dataType": "INT",
"name": "ActualElapsedTime"
},
{
"dataType": "INT",
"name": "AirTime"
},
{
"dataType": "INT",
"name": "AirlineID"
},
{
"dataType": "INT",
"name": "ArrDel15"
},
{
"dataType": "INT",
"name": "ArrDelay"
},
{
"dataType": "INT",
"name": "ArrDelayMinutes"
},
{
"dataType": "INT",
"name": "ArrTime"
},
{
"dataType": "STRING",
"name": "ArrTimeBlk"
},
{
"dataType": "INT",
"name": "ArrivalDelayGroups"
},
{
"dataType": "INT",
"name": "CRSArrTime"
},
{
"dataType": "INT",
"name": "CRSDepTime"
},
{
"dataType": "INT",
"name": "CRSElapsedTime"
},
{
"dataType": "STRING",
"name": "CancellationCode"
},
{
"dataType": "INT",
"name": "Cancelled"
},
{
"dataType": "STRING",
"name": "Carrier"
},
{
"dataType": "INT",
"name": "CarrierDelay"
},
{
"dataType": "INT",
"name": "DayOfWeek"
},
{
"dataType": "INT",
"name": "DayofMonth"
},
{
"dataType": "INT",
"name": "DepDel15"
},
{
"dataType": "INT",
"name": "DepDelay"
},
{
"dataType": "INT",
"name": "DepDelayMinutes"
},
{
"dataType": "INT",
"name": "DepTime"
},
{
"dataType": "STRING",
"name": "DepTimeBlk"
},
{
"dataType": "INT",
"name": "DepartureDelayGroups"
},
{
"dataType": "STRING",
"name": "Dest"
},
{
"dataType": "INT",
"name": "DestAirportID"
},
{
"dataType": "INT",
"name": "DestAirportSeqID"
},
{
"dataType": "INT",
"name": "DestCityMarketID"
},
{
"dataType": "STRING",
"name": "DestCityName"
},
{
"dataType": "STRING",
"name": "DestState"
},
{
"dataType": "INT",
"name": "DestStateFips"
},
{
"dataType": "STRING",
"name": "DestStateName"
},
{
"dataType": "INT",
"name": "DestWac"
},
{
"dataType": "INT",
"name": "Distance"
},
{
"dataType": "INT",
"name": "DistanceGroup"
},
{
"dataType": "INT",
"name": "DivActualElapsedTime"
},
{
"dataType": "INT",
"name": "DivAirportIDs",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivAirportLandings"
},
{
"dataType": "INT",
"name": "DivAirportSeqIDs",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "DivAirports",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivArrDelay"
},
{
"dataType": "INT",
"name": "DivDistance"
},
{
"dataType": "INT",
"name": "DivLongestGTimes",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivReachedDest"
},
{
"dataType": "STRING",
"name": "DivTailNums",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivTotalGTimes",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivWheelsOffs",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivWheelsOns",
"singleValueField": false
},
{
"dataType": "INT",
"name": "Diverted"
},
{
"dataType": "INT",
"name": "FirstDepTime"
},
{
"dataType": "STRING",
"name": "FlightDate"
},
{
"dataType": "INT",
"name": "FlightNum"
},
{
"dataType": "INT",
"name": "Flights"
},
{
"dataType": "INT",
"name": "LateAircraftDelay"
},
{
"dataType": "INT",
"name": "LongestAddGTime"
},
{
"dataType": "INT",
"name": "Month"
},
{
"dataType": "INT",
"name": "NASDelay"
},
{
"dataType": "STRING",
"name": "Origin"
},
{
"dataType": "INT",
"name": "OriginAirportID"
},
{
"dataType": "INT",
"name": "OriginAirportSeqID"
},
{
"dataType": "INT",
"name": "OriginCityMarketID"
},
{
"dataType": "STRING",
"name": "OriginCityName"
},
{
"dataType": "STRING",
"name": "OriginState"
},
{
"dataType": "INT",
"name": "OriginStateFips"
},
{
"dataType": "STRING",
"name": "OriginStateName"
},
{
"dataType": "INT",
"name": "OriginWac"
},
{
"dataType": "INT",
"name": "Quarter"
},
{
"dataType": "STRING",
"name": "RandomAirports",
"singleValueField": false
},
{
"dataType": "INT",
"name": "SecurityDelay"
},
{
"dataType": "STRING",
"name": "TailNum"
},
{
"dataType": "INT",
"name": "TaxiIn"
},
{
"dataType": "INT",
"name": "TaxiOut"
},
{
"dataType": "INT",
"name": "Year"
},
{
"dataType": "INT",
"name": "WheelsOn"
},
{
"dataType": "INT",
"name": "WheelsOff"
},
{
"dataType": "INT",
"name": "WeatherDelay"
},
{
"dataType": "STRING",
"name": "UniqueCarrier"
},
{
"dataType": "INT",
"name": "TotalAddGTime"
}
],
"dateTimeFieldSpecs": [
{
"name": "DaysSinceEpoch",
"dataType": "INT",
"format": "1:DAYS:EPOCH",
"granularity": "1:DAYS"
}
],
"schemaName": "airlineStats"
}
```
- send a json message from the airline stats example using the pulsar-client
airlinestats00.json
```
{"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,
"DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,
"DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,
"DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA",
"DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,
"LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,
"DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York",
"FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY",
"OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,
"OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238,
"ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS",
"DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD",
"ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,
"Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"
TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,
"TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,
"Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"}
```
send this message using producer client
```
$ docker run -it \
--rm \
--network <pinot-network> \
apachepulsar/pulsar:2.8.1 \
-v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \
bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 pinot
```
funny enough I tested this today and actually got another error that is different from the two mentioned above:
```
2021/12/20 16:26:21.807 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: airlineStats__0__0__20211220T1626Z
java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
it also related to the MessageId parsing and was also fixed with my implementation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org