You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by Pinot Slack Email Digest <sn...@apache.org> on 2021/05/24 02:00:19 UTC

Apache Pinot Daily Email Digest (2021-05-23)

### _#general_

  
 **@sahoo.skumar:** @sahoo.skumar has joined the channel  

###  _#random_

  
 **@sahoo.skumar:** @sahoo.skumar has joined the channel  

###  _#troubleshooting_

  
 **@sahoo.skumar:** @sahoo.skumar has joined the channel  

###  _#getting-started_

  
 **@mohit.asingh:** @mohit.asingh has joined the channel  
 **@mohit.asingh:** Hello Everyone.. i am trying to inject data from kafka
topic to apache pinot but i didn't see any data loaded do i am missing
anything in config related to avro ? *Schema* ```{ "schemaName":
"test_schema", "dimensionFieldSpecs": [ { "name": "client_id", "dataType":
"STRING" }, { "name": "master_property_id", "dataType": "INT" }, { "name":
"business_unit", "dataType": "STRING" }, { "name": "error_info_str",
"dataType": "STRING" } ], "dateTimeFieldSpecs": [ { "name": "timestamp",
"dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity":
"1:MILLISECONDS" } ] }``` Table: ```{ "REALTIME": { "tableName":
"test_schema_REALTIME", "tableType": "REALTIME", "segmentsConfig": {
"schemaName": "test_schema", "replication": "1", "replicasPerPartition": "1",
"timeColumnName": "timestamp" }, "tenants": { "broker": "DefaultTenant",
"server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": {
"bloomFilterColumns": [], "noDictionaryColumns": [],
"onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [],
"enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false,
"aggregateMetrics": false, "nullHandlingEnabled": false,
"invertedIndexColumns": [], "rangeIndexColumns": [],
"autoGeneratedInvertedIndex": false,
"createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [],
"loadMode": "MMAP", "streamConfigs": { "streamType": "kafka",
"stream.kafka.topic.name": "TestTopic", "stream.kafka.broker.list":
"localhost:9092", "stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"schema.registry.url": "", "realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.segment.size": "100M" } }, "metadata": {}, "quota":
{}, "routing": {}, "query": {}, "ingestionConfig": { "transformConfigs": [ {
"columnName": "error_info_str", "transformFunction": "json_format(error_info)"
} ] }, "isDimTable": false } }``` Kafka Avro Schema: ``` { "type": "record",
"name": "TestRecord", "namespace": "com.test.ns", "fields": [ {"name":
"client_id", "type": ["null","string"]}, {"name": "master_property_id",
"type": "int"}, {"name": "business_unit", "type": ["null","string"]}, {"name":
"error_info", "type": { "type": "record", "name": "ErrorInfo", "fields": [
{"name": "code", "type": ["null","string"]}, {"name": "description", "type":
["null","string"]} ] } }, {"name": "timestamp", "type": ["null","long"],
"default": null} ] }```  
**@mohit.asingh:** ```Caught exception while decoding row, discarding row.
Payload is9899�$UNIT-1621781307861E-12345,Some error description�����^
java.io.CharConversionException: Invalid UTF-32 character 0x2010839 (above
0x0010ffff) at char #1, byte #7) at
shaded.com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
~[pinot-all-0.7.1-jar-with-
dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at
shaded.com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
~[pinot-all-0.7.1-jar-with-
dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at
shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:258)
~[pinot-all-0.7.1-jar-with-
dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at
shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2353)
~[pinot-all-0.7.1-jar-with-
dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]``` i saw this
error in server looks like some problem to decode message from kafka topic.  
**@g.kishore:** is there anything else in the log?  
**@mohit.asingh:** its working now i missed one configuration
`"stream.kafka.decoder.prop.schema.registry.rest.url":`  
**@g.kishore:** Cool  
**@g.kishore:** Can you make a fix to decoder to warn about missing
properties?  
**@mohit.asingh:** @g.kishore just a quick point correct me if i am wrong.. we
have to add one timestamp field in schema that when creating realtime table
that timestamp will be use by retention of data etc..? in my case lets say i
don't have any timestamp in kafka avro schema but i added a field as
`timestamp` in apache pinot schema will it pick a default value automatically?  
**@g.kishore:** No.. you can probably use a udf to set it to now()  
**@mohit.asingh:** i was testing the same scenario and realised it take some
negative value not sure what it indicate in time of timestamp
`-9223372036854776000` now() do i need to use this in transform function?  
**@g.kishore:** Yes  
**@mohit.asingh:** its throwing error if i add now() in transform function for
timestamp. Invalid table config: test_schema_now_REALTIME. Invalid transform
function `'now()'` for column '`timestamp`'  

###  _#feat-partial-upsert_

  
 **@jackie.jxt:** @qiaochu Can you try rebuild the project and see if the
problem still exist?  
 **@jackie.jxt:** It should not have problem because it is part of the CI test
on github  
\--------------------------------------------------------------------- To
unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org For additional commands,
e-mail: dev-help@pinot.apache.org