You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by Pinot Slack Email Digest <ap...@gmail.com> on 2022/04/29 02:00:27 UTC

Apache Pinot Daily Email Digest (2022-04-28)

### _#general_

  
 **@ysuo:** Hi team, can Pinot ingest one partition of one Kafka topic which
has many partition?  
**@mayanks:** If you mean you want to ignore all other partitions, you might
be able to achieve it by having filter functions in ingestion. However, this
is not a setup that I would recommend. What’s the use case?  
**@ysuo:** yes, like I can specify a partition number and Pinot can ingest
just that partition and save the data to one table.  
**@ysuo:** Use case is different business data are partitioned to different
Kafka partitions.  
**@jadami:** you can also filter on `$segmentName` which has some convention
to put the partition number in the segment name, but i doubt it’s fast enough.  
**@mayanks:** Right, but what’s the business case where you want to do this?
Note, Pinot scales ingestion by having parallel consumers (one per partition).
If you just have one partition to consume from you will lose out on that.  
**@mayanks:** I’d not recommend relying on the `$segmentName` convention, as
there is no contract to keep it consistent in future.  
**@ysuo:** Multi Business data is merged and written to one Kafka topic and We
depend on Pinot to separate data into different tables. Hence this topic is
consumed repeatedly and it’s resource consuming.  
**@ysuo:** Is there any method to ingest one Kafka topic and write data into
different Pinot tables?  
**@mayanks:** What is the total data size across all partitions (per day)?  
**@ysuo:** That’s a key point. Maybe 0.15 billion per day.  
**@mayanks:** 0.15 B rows?  
**@ysuo:** yes  
**@mayanks:** You can just have a single table partitioned by org. It will be
much simpler.  
**@mayanks:** You can configure same partition function in Pinot and Kafka,
and pinot will only look at the partition for the query, and hence scalable  
**@mayanks:** also avoids operations overhead of multiple tables  
**@mayanks:** 150M rows per day is not a big deal  
**@ysuo:** I’m afraid one table is not ok. Different business data has
different fields.  
**@mayanks:** You can create multiple tables and have filter function in
ingestion that each table only ingests one transform, that is definitely
possible. But like I mentioned, it is not the most ideal/cost-effective
solution. Is there an option to repartition upstream?  
**@ysuo:** I see. You can create multiple tables and have filter function in
ingestion that each table only ingests one transform, that is definitely
possible. Like you mentioned here, that’s what we want to do and considering
the cost I had the question here.  
**@ysuo:** About repartition upstream, what’s your suggestion?:innocent:  
**@mayanks:** You could have a stream processing upstream that takes your
kafka stream, and splits it into multiple topics (one per org) with partitions
within each partition  
**@ysuo:** Thanks, Mayank. I like your idea and it makes things much easier.  
**@g.kishore:** Btw, we did some changes when we added the support for
kinesis.. those changes will actually make it easier to add support to consume
a subset of partitions..  
**@g.kishore:** I can point you to the code if you want to contribute this
feature  
**@ysuo:** Sure, I’d like to give it a try.  
 **@anishbabu.m:** @anishbabu.m has joined the channel  
 **@wcxzjtz:** wondering if it works for kafka streaming ingestion: ```
"ingestionConfig": { "transformConfigs": [ { "columnName":
"brand_name_facility_id_tuple", "transformFunction": "concat(brand_name,
facility_id, ':')" } ] },``` not sure if the `concat` works here. the examples
here are mostly groovy function:  
**@npawar:** yup this should work. cc @mark.needham seems docs are incomplete
or causing some confusion?  
**@mark.needham:** Thanks - lemme add some more examples  
**@wcxzjtz:** got it. thanks @npawar and @mark.needham  
**@wcxzjtz:** Hey @npawar and @mark.needham, btw, what is the difference to
put the transformFunction in `dimensionFieldSpecs` directly like following:  
**@wcxzjtz:** thanks  
 **@buntyk91:** Hi All, I am working on deployement of apache pinot on
kubernetes.Now we have already deployed apache pinot on one kubernetes cluster
using the helm chart and it is working from last one month.Now we have to
transfer the deployement from one kubernetes cluster to another but we want to
use the same kubernetes pod which is running in the first cluster and hence we
have exposed the zookeeper from there. Where I have to set the cluster URL
when deploying in the new kubernetes cluster.I am pointing by new deployement
to the old zookeper because of the data zookeper node are holding.  
 **@ysuo:** Hi team, Is it possible that two tables belongs to different
tenant server and broker has the same table name?  
**@npawar:** it is not possible atm. what’s motivating this requirement? at
the query side, we only refer to table name (no tenant), so if this were
hypothetically supported, you’d expect to change your query too to `select *
from tenant.tableName` ? just trying to understand what you had in mind  
**@ysuo:** Thanks. Just trying to figure out if it’s possible to use just one
cluster for dev, test and prod environments.  

###  _#random_

  
 **@anishbabu.m:** @anishbabu.m has joined the channel  

###  _#troubleshooting_

  
 **@anishbabu.m:** @anishbabu.m has joined the channel  
 **@lars-kristian_svenoy:** Hello everyone. Thanks again for all your help
with everything so far. I have a question regarding upsert, and how to deal
with deduping for a certain scenario.. details in thread.  
**@lars-kristian_svenoy:** So we have multiple tables where whenever new data
comes in, that defines the entire state of that entity. Unfortunately as the
incoming events is parsed into a 1:Many relationship, we have no reliable way
of deduping data, as if we specify a primary key down to the granularity of
that relationship, we end up retaining data which was not provided in that new
event. I'm not entirely sure how to deal with this issue, but one potential
solution seems to be to use a json object type, allowing us to store the many
relationship in that. Unfortunately, there are limitations with that json
relationship which makes it infeasible to do so. My question is whether it is
possible to achieve this at all using an upsert table, or if I would need a
custom batch job to do this.. I had a thought that perhaps it would be
possible to store the event with the composite key in kafka, but extract the
JSON payload during realtime ingestion. I'm assuming this doesn't work, any
thoughts?  
**@mayanks:** Trying to understand, is the requirement that with each new
event for the same key, we need to append to the JSON payload (or attributes)?
cc: @jackie.jxt  
 **@humengyuk18:** I have some realtime table’s consuming segments in error
state after upgrade and restart, resetting the segment and restart did not fix
this problem, any suggestions?  
**@jackie.jxt:** Can you please check the server log and see if there is any
ERROR log?  
**@jackie.jxt:** Also kindly remind to not use `@ here` in a channel of
thousands of audience  
**@humengyuk18:** I see, we have some table using time as a column there are
some error in the transform function.  
**@humengyuk18:** I have one table’s consuming segment got deleted, any way I
can bring it back?  
**@npawar:** recovering from deleted consuming segment involves some manual
steps.. we prolly need to prioritize this issue:  
**@npawar:** is this a dev environment and can you just recreate the table?
also curious, how did it get deleted?  
**@humengyuk18:** It’s production, I thought delete it will cause pinot to
recreate the consuming segment.  
 **@abhijeet.kushe:** Hi Team, There was a production issue due to which we
lost records from 04/20 to 04/26.We create a segment each day.We have a log of
all the records.I wanted to know whether I can just directly stream those
records to pinot.The eventTimestamp on which the Pinot RealtimeTable is
configured would be between 04/26.So will the back dated entries in the pinot
realtime table be captured in a current segment or will get merged into the
older segment which has been closed ?  

###  _#thirdeye-pinot_

  
 **@madhumitamantri:** @madhumitamantri has joined the channel  

###  _#getting-started_

  
 **@anishbabu.m:** @anishbabu.m has joined the channel  
 **@octchristmas:** Hi. Pinot Team! I am testing an injection job using a
'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'. My problem
is that the binary type of parquet file is not converted to the string type of
pinot. The data that appears as a query in the pinot is like a hex string.
What's the problem? \-- query result "resultTable": { "dataSchema": {
"columnNames": [ "firstname", "gender", "lastname", "score", "studentid",
"subject", "timestampinepoch" ], "columnDataTypes": [ "STRING", "STRING",
"STRING", "INT", "INT", "STRING", "LONG" ] }, "rows": [ [ "4e6174616c6965",
"46656d616c65", "4a6f6e6573", 3, 109, "4d61746873", 1647980000000 ] ] }, \--
parquet file schema parquet-tools schema
6a4e9212ba501d90-c3a971300000000_1596454343_data.0.parq message schema {
optional int32 studentid; optional binary firstname; optional binary lastname;
optional binary gender; optional binary subject; optional int32 score;
optional int64 timestampinepoch; } \-- injestion job log read value:
{"studentid": 109, "firstname": "Natalie", "lastname": "Jones", "gender":
"Female", "subject": "Maths", "score": 3, "timestampinepoch": 1647980000000}
Start building IndexCreator! Finished records indexing in IndexCreator!
FileName set to metadata.properties Base path set to
/tmp/pinot-6a2e3b81-8eda-40c9-9a53-0d9cc03c85fd/output/tmp-8b8f9c6b-6a22-41d9-a16d-3eefe3d75d81
Finished segment seal! Converting segment:
/tmp/pinot-6a2e3b81-8eda-40c9-9a53-0d9cc03c85fd/output/batch_2022-03-22_2022-03-22
to v3 format FileName set to metadata.properties \-- table { "tableName":
"transcript", "tableType": "OFFLINE", "segmentsConfig": { "schemaName":
"transcript", "replication": 3, "timeColumnName": "timestampinepoch",
"timeType": "MILLISECONDS" }, "tenants": { "broker":"DefaultTenant",
"server":"DefaultTenant" }, "tableIndexConfig": { "loadMode": "MMAP" },
"ingestionConfig": { "batchIngestionConfig": { "segmentIngestionType":
"APPEND", "segmentPushFrequency": "DAILY" } }, "metadata": {} } \-- schema {
"schemaName": "transcript", "dimensionFieldSpecs": [ { "name": "studentid",
"dataType": "INT" }, { "name": "firstname", "dataType": "STRING" }, { "name":
"lastname", "dataType": "STRING" }, { "name": "gender", "dataType": "STRING"
}, { "name": "subject", "dataType": "STRING" } ], "metricFieldSpecs": [ {
"name": "score", "dataType": "INT" } ], "dateTimeFieldSpecs": [{ "name":
"timestampinepoch", "dataType": "LONG", "format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS" } ] }  
**@g.kishore:** You can use a transformconfig during ingestion to convert hex
to string  
**@octchristmas:** @g.kishore My sample data was a string, and I think it was
saved in binary format when saved as a parquet file. When using
ParquetRecordReader in InjectionJob, should I always translate hex to string?  
**@g.kishore:** whats the parquet schema  
**@g.kishore:** ```case BINARY: case FIXED_LEN_BYTE_ARRAY: if (originalType ==
OriginalType.UTF8) { return from.getValueToString(fieldIndex, index); } if
(originalType == OriginalType.DECIMAL) { DecimalMetadata decimalMetadata =
fieldType.asPrimitiveType().getDecimalMetadata(); return
binaryToDecimal(from.getBinary(fieldIndex, index),
decimalMetadata.getPrecision(), decimalMetadata.getScale()); } return
from.getBinary(fieldIndex, index).getBytes();``` this is the extractor code  
**@g.kishore:** if your type is utf8, then we read it as string  
**@g.kishore:** else bytes  
**@octchristmas:** @g.kishore Thank you. That was the problem. Apache Impala
does not use utf8 annotation to save parquet files.  

###  _#flink-pinot-connector_

  
 **@npawar:** It should be, after this commit  
**@npawar:** @yupeng is there a user doc?  
 **@ysuo:** Thanks. Is there a timeline?  
 **@yupeng:** no user doc yet  
 **@yupeng:** but you can check out the readme  

###  _#introductions_

  
 **@anishbabu.m:** @anishbabu.m has joined the channel  
\--------------------------------------------------------------------- To
unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org For additional commands,
e-mail: dev-help@pinot.apache.org