You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/20 17:28:36 UTC

[GitHub] [pinot] aleksdikanski edited a comment on issue #7270: Fix the issue with "pinot-pulsar" module (potentially library conflicts)

aleksdikanski edited a comment on issue #7270:
URL: https://github.com/apache/pinot/issues/7270#issuecomment-998128028


   Sure @KKcorps , I basically followed the guides on the pulsar and pinot websites:
   - set up a pulsar standalone cluster using https://pulsar.apache.org/docs/en/standalone-docker/#start-pulsar-in-docker
     **Please note** that I ran the pulsar container in the same network as the pinot cluster for this demo setup and that I gave the pulsar container a name (basically add a `--network <pinot-network> --name pulsar` to the docker run command)
   - set up a pinot cluster using the manual setup https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#manual-cluster
   - add a pulsar topic (it didn't matter if it was partitioned or not): 
       ```
       $ docker run -it \
          --rm \
          --network <pinot-network> \
          -p 127.0.0.1:6650:6650 \
          -p 127.0.0.1:8080:8080 \
          apachepulsar/pulsar:2.8.1 \
          bin/pulsar-admin create persistent://public/default/pinot
       ```
   - add a table and schema to pinot using the following table and schema declaration
        table.json
       ```
       {
        "tableName": "airlineStats",
         "tableType": "REALTIME",
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant"
        },
        "segmentsConfig": {
           "schemaName": "airlineStats",
           "timeColumnName": "DaysSinceEpoch",
           "replication": "1",
           "replicasPerPartition": "1",
           "timeType": "DAYS",
           "retentionTimeUnit": "DAYS",
           "retentionTimeValue": "365",
           "segmentPushType": "APPEND",
           "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
         },
         "tableIndexConfig": {
           "loadMode": "MMAP",
           "streamConfigs": {
             "streamType": "pulsar",
             "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
             "stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
             "stream.pulsar.consumer.type": "lowlevel",
             "stream.pulsar.topic.name": "pinot",
             "stream.pulsar.fetch.timeout.millis": "10000",
             "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
             "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
             "realtime.segment.flush.threshold.size": "10000",
             "realtime.segment.flush.threshold.time": "1h"
           }
         },
         "metadata": {}
       }
       ```
       schema.json
       ```
       {
           "metricFieldSpecs": [
           ],
           "dimensionFieldSpecs": [
             {
               "dataType": "INT",
               "name": "ActualElapsedTime"
             },
             {
               "dataType": "INT",
               "name": "AirTime"
             },
             {
               "dataType": "INT",
               "name": "AirlineID"
             },
             {
               "dataType": "INT",
               "name": "ArrDel15"
             },
             {
               "dataType": "INT",
               "name": "ArrDelay"
             },
             {
               "dataType": "INT",
               "name": "ArrDelayMinutes"
             },
             {
               "dataType": "INT",
               "name": "ArrTime"
             },
             {
               "dataType": "STRING",
               "name": "ArrTimeBlk"
             },
             {
               "dataType": "INT",
               "name": "ArrivalDelayGroups"
             },
             {
               "dataType": "INT",
               "name": "CRSArrTime"
             },
             {
               "dataType": "INT",
               "name": "CRSDepTime"
             },
             {
               "dataType": "INT",
               "name": "CRSElapsedTime"
             },
             {
               "dataType": "STRING",
               "name": "CancellationCode"
             },
             {
               "dataType": "INT",
               "name": "Cancelled"
             },
             {
               "dataType": "STRING",
               "name": "Carrier"
             },
             {
               "dataType": "INT",
               "name": "CarrierDelay"
             },
             {
               "dataType": "INT",
               "name": "DayOfWeek"
             },
             {
               "dataType": "INT",
               "name": "DayofMonth"
             },
             {
               "dataType": "INT",
               "name": "DepDel15"
             },
             {
               "dataType": "INT",
               "name": "DepDelay"
             },
             {
               "dataType": "INT",
               "name": "DepDelayMinutes"
             },
             {
               "dataType": "INT",
               "name": "DepTime"
             },
             {
               "dataType": "STRING",
               "name": "DepTimeBlk"
             },
             {
               "dataType": "INT",
               "name": "DepartureDelayGroups"
             },
             {
               "dataType": "STRING",
               "name": "Dest"
             },
             {
               "dataType": "INT",
               "name": "DestAirportID"
             },
             {
               "dataType": "INT",
               "name": "DestAirportSeqID"
             },
             {
               "dataType": "INT",
               "name": "DestCityMarketID"
             },
             {
               "dataType": "STRING",
               "name": "DestCityName"
             },
             {
               "dataType": "STRING",
               "name": "DestState"
             },
             {
               "dataType": "INT",
               "name": "DestStateFips"
             },
             {
               "dataType": "STRING",
               "name": "DestStateName"
             },
             {
               "dataType": "INT",
               "name": "DestWac"
             },
             {
               "dataType": "INT",
               "name": "Distance"
             },
             {
               "dataType": "INT",
               "name": "DistanceGroup"
             },
             {
               "dataType": "INT",
               "name": "DivActualElapsedTime"
             },
             {
               "dataType": "INT",
               "name": "DivAirportIDs",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivAirportLandings"
             },
             {
               "dataType": "INT",
               "name": "DivAirportSeqIDs",
               "singleValueField": false
             },
             {
               "dataType": "STRING",
               "name": "DivAirports",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivArrDelay"
             },
             {
               "dataType": "INT",
               "name": "DivDistance"
             },
             {
               "dataType": "INT",
               "name": "DivLongestGTimes",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivReachedDest"
             },
             {
               "dataType": "STRING",
               "name": "DivTailNums",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivTotalGTimes",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivWheelsOffs",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "DivWheelsOns",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "Diverted"
             },
             {
               "dataType": "INT",
               "name": "FirstDepTime"
             },
             {
               "dataType": "STRING",
               "name": "FlightDate"
             },
             {
               "dataType": "INT",
               "name": "FlightNum"
             },
             {
               "dataType": "INT",
               "name": "Flights"
             },
             {
               "dataType": "INT",
               "name": "LateAircraftDelay"
             },
             {
               "dataType": "INT",
               "name": "LongestAddGTime"
             },
             {
               "dataType": "INT",
               "name": "Month"
             },
             {
               "dataType": "INT",
               "name": "NASDelay"
             },
             {
               "dataType": "STRING",
               "name": "Origin"
             },
             {
               "dataType": "INT",
               "name": "OriginAirportID"
             },
             {
               "dataType": "INT",
               "name": "OriginAirportSeqID"
             },
             {
               "dataType": "INT",
               "name": "OriginCityMarketID"
             },
             {
               "dataType": "STRING",
               "name": "OriginCityName"
             },
             {
               "dataType": "STRING",
               "name": "OriginState"
             },
             {
               "dataType": "INT",
               "name": "OriginStateFips"
             },
             {
               "dataType": "STRING",
               "name": "OriginStateName"
             },
             {
               "dataType": "INT",
               "name": "OriginWac"
             },
             {
               "dataType": "INT",
               "name": "Quarter"
             },
             {
               "dataType": "STRING",
               "name": "RandomAirports",
               "singleValueField": false
             },
             {
               "dataType": "INT",
               "name": "SecurityDelay"
             },
             {
               "dataType": "STRING",
               "name": "TailNum"
             },
             {
               "dataType": "INT",
               "name": "TaxiIn"
             },
             {
               "dataType": "INT",
               "name": "TaxiOut"
             },
             {
               "dataType": "INT",
               "name": "Year"
             },
             {
               "dataType": "INT",
               "name": "WheelsOn"
             },
             {
               "dataType": "INT",
               "name": "WheelsOff"
             },
             {
               "dataType": "INT",
               "name": "WeatherDelay"
             },
             {
               "dataType": "STRING",
               "name": "UniqueCarrier"
             },
             {
               "dataType": "INT",
               "name": "TotalAddGTime"
             }
           ],
           "dateTimeFieldSpecs": [
             {
               "name": "DaysSinceEpoch",
               "dataType": "INT",
               "format": "1:DAYS:EPOCH",
               "granularity": "1:DAYS"
             }
           ],
           "schemaName": "airlineStats"
       }
       ```
   - send a json message from the airline stats example using the pulsar-client
      airlinestats00.json
       ```
       {"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,
         "DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,
         "DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,
         "DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA",
         "DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,
        "LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,
        "DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York",
        "FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY",
        "OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,
       "OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238, 
       "ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS",
       "DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD",
       "ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,
       "Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"
       TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,
       "TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,
       "Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"}
       ```
       send this message using producer client
       ```
        $ docker run -it \
           --rm \
           --network <pinot-network> \
           apachepulsar/pulsar:2.8.1 \
           -v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \
           bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 pinot
       ```
   
   funny enough I tested this today and actually got another error that is different from the two mentioned above:
   ```
   2021/12/20 16:26:21.807 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: airlineStats__0__0__20211220T1626Z
   java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
   at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
   at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
   at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906] 
   at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
   at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
   at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
   at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
   at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
   at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
   at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
   at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   it also related to the MessageId parsing and was also fixed with my implementation 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org