You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Pinot Slack Email Digest <> on 2022/04/15 02:00:26 UTC

Apache Pinot Daily Email Digest (2022-04-14)

### _#general_

 **@ysuo:** Hi, if I tag a server tenanta and a broker tenanta, then modify
the table’s tenants config to use tenanta broker and tenanta server, and
finally rebalance brokers and servers, will the table’s index data be moved to
the server tagged tenanta? From my test, the index data for this table is
still located on the previous server.  
**@mayanks:** Yes, rebalance will do the move. Are you saying that the data is
not moved at all, or that the old server “also” has the data apart from the
new one?  
**@ysuo:** the data is not moved at all.  
**@mayanks:** When you try the dry run does it give you the new ideal state
that is different?  
**@ysuo:** 8216 is my tagged server  
**@mayanks:** 8216 is new or old?  
**@ysuo:** new  
**@mayanks:** What’s the replication?  
**@ysuo:** but index data is still in 8212.  
**@ysuo:** 1 replication  
**@ysuo:** and no index data in 8216  
**@mayanks:** Then no way to move without downtime  
**@ysuo:** ok  
**@ysuo:** I see  
**@mayanks:** I agree that the rebalance command should give back that
feedback. You can file a GH issue for that  
**@ysuo:** It’s moved to 8216 now. Thanks. And will the index data in 8212 be
deleted automatically?  
**@mayanks:** yes  
**@ysuo:** RetentionManager task does the job? So I’ll just wait a moment to
check if it’s deleted?  
**@mayanks:** Is it not deleted by rebalance? cc: @jackie.jxt  
**@ysuo:** I checked the files in 8212. Only empty folders left and no index
data already.  
**@mayanks:** Ok, then we are good  
 **@ysuo:** Hi, Pinot creates initially the same number segments as the number
of a topic partition when ingesting a Kafka stream data to a table. Can I
adjust the segment number instead of the topic partition number?  
**@kharekartik:** By adjusting, you mean fix the number of segments equal to
number of partitions OR just append some number suffix?  
**@ysuo:** If a topic has 100 partitions, can Pinot create just 20 segments
when the table is initially created?  
**@ysuo:** I mean, if a topic has 100 partitions, 100 partition consumers will
be created, right? Can partition consumer numbers be adjusted?  
**@kharekartik:** No, that wont be possible. For lowLevel consumer, each
partition is consumed independently of others and goes into a different
**@kharekartik:** We may be able to do in future with already added classes
for PartitionGroups whereby few partitions can be mapped to a single consumer.
Currently, though, there is no plan for doing that  
**@g.kishore:** Alice.. any reason why you want to do this?  
**@g.kishore:** You can always merge segments into bigger segments later  
**@g.kishore:** Maintaining one segment per partition has a lot of benefits  
**@ysuo:** due to limited resources, cpus. If a topic has 100 partitions, I
want to create 5 tables to consume the same topic, will 500 partition
consumers be created? Any suggest I can refer to design my table?  
**@ysuo:** Thanks, but how can I do this? To merge segments into bigger
**@mark.needham:** One way to merge them is once they're in an offline table
you can use the merge rollup task -  
**@ysuo:** Ok, I’ll try. Thanks. @mark.needham  
**@mark.needham:** there's also a video explaining how it works in more detail
**@ysuo:** Thanks, that’s very helpful.  
**@g.kishore:** See real-time to offline minion task  
**@mark.needham:** that one is described here -  
**@octchristmas:** I'm looking for a way to delete (a row) and change data in
pinot . For example, if a member withdraws, all data of the member must be
deleted immediately. \- I can replace segments of a full period in an offline
table. \- In realtime tables I would use UPSERT mode. I can upsert null
values. But I can't use star-tree index. Can I delete without using UPSERT
mode?  Is there a way to delete a Row from a segment of an offline + realtime
table in Pinot?  
**@mark.needham:** Hey - you can't delete individual rows. As you said, you
can only delete at the segment level for both offline and real-time tables.  
**@richard892:** @octchristmas I'm not a GDPR expert but if you get a GDPR
request is it enough to make it impossible to _retrieve_ data for that user?  
**@richard892:** because if that's enough (I know there are strategies in some
frameworks like encrypt the data and throw away the encryption key when the
request comes in) we can make easily an indexing feature to support this  
**@richard892:** essentially we could apply a mask to the data meaning "don't
read this row" but which would keep the offline segment format immutable  
**@octchristmas:** @mark.needham Thanks for the answer, how to replace the
segments (comsuming and completed) of a realtime table?  
**@octchristmas:** @richard892 Thanks for the answer, are encryption keys
managed per user? I want to know more. Do you have any documentation on this?  
**@richard892:** sorry I was asking about your requirements, not describing a
**@mayanks:** @octchristmas The minion purge task can be used for GDPR purging  
**@octchristmas:** @richard892 I have to comply with the GDPR, and the best
way is to delete the individual rows.  
**@mayanks:** Yes minion purge task for that @octchristmas  
**@octchristmas:** @mayanks Thanks so much! :heart_eyes:  I checked Pinot's
Purge Task, but I didn't mention it because I wanted to see if there was any
other way. However, I think it's only a way to use PurgeTask, so I have a few
questions about PurgeTask. Q1) Can PurgeTask also delete individual rows in
the committed segment (not concealing) of the realtime table? Q2) PurgeTask
does not seem to delete individual rows by random accessing segment files. If
PurgeTask downloads, regenerates, and uploads segment files, what is the
difference from injection job? I am trying to understand this difference. We
will prefer an injection job to using Minion and implementing a task code.
because it is more familiar to develop an injestion job. Q3) We can service
large amounts of data and large numbers of segments. If PurgeTask works in a
download and regenerative manner, regeneration and reload of segments will
likely affect clusters or services, regardless of whether PurgeTask or
InjectionJob is used. How will the cluster or service be affected?  
**@mayanks:** 1\. Purge task is for offline tables  
**@mayanks:** 2\. It is smarter to avoid regeneration of segment if nothing
change. Also it takes away the burden of maintaining another ingestion
pipeline. But essentially it is the same  
**@mayanks:** 3\. Download/upload of data should not impact cluster
performance. How much data are we talking about?  
 **@gxm.monica:** Hi everyone, I was trying to use spark to do batch
ingestion. But I got an error like this when I executed: ```ERROR StatusLogger
Unrecognized format specifier [d] ERROR StatusLogger Unrecognized conversion
specifier [d] starting at position 16 in conversion pattern. ERROR
StatusLogger Unrecognized format specifier [thread] ERROR StatusLogger
Unrecognized conversion specifier [thread] starting at position 25 in
conversion pattern. ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at
position 35 in conversion pattern. ERROR StatusLogger Unrecognized format
specifier [logger] ERROR StatusLogger Unrecognized conversion specifier
[logger] starting at position 47 in conversion pattern. ERROR StatusLogger
Unrecognized format specifier [msg] ERROR StatusLogger Unrecognized conversion
specifier [msg] starting at position 54 in conversion pattern. ERROR
StatusLogger Unrecognized format specifier [n] ERROR StatusLogger Unrecognized
conversion specifier [n] starting at position 56 in conversion pattern. ERROR
StatusLogger Reconfiguration failed: No configuration found for '533ddba' at
'null' in 'null' Exception in thread "main"
java.lang.ExceptionInInitializerError at<init>(
at java.lang.Class.forName0(Native Method) at
java.lang.Class.forName( at
org.apache.spark.util.Utils$.classForName(Utils.scala:237) at
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:813) at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by:
java.util.NoSuchElementException at
java.util.ServiceLoader$LazyIterator.nextService( at
java.util.ServiceLoader$ at
java.util.ServiceLoader$ at
... 12 more``` It seems like spark couldn't find
`` from Kafka
plugin. I built pinot from source code on the `master` branch using command
(because we use jdk8 in our machines): ```mvn clean install -DskipTests -Pbin-
dist -T 4 -Djdk.version=8``` My spark job using commands like this, which I've
set `-DPlugins.dir` according to : ```export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin echo
dependencies.jar cd ${PINOT_DISTRIBUTION_DIR} ${SPARK_HOME}/bin/spark-submit \
\--class \ \--master
"local[2]" \ \--deploy-mode client \ \--conf
\ \--conf
\ \--conf
log4j2.xml" \ \--conf
all-${PINOT_VERSION}-jar-with-dependencies.jar" \
dependencies.jar \ LaunchDataIngestionJob \ -jobSpecFile
Is it because spark couldn't find my plugins' jars from `plugins.dir`, I'm not
familiar with spark, do I need to add all plugins' jars to spark classpath
using `--jars` or something? Could you help me?  
**@kharekartik:** Hi. Can I understand why you need to use Kafka in Batch
Ingestion? Also, can you share the ingestions spec. In you spark-submit
command, pinot-batch-ingestion-spark plugin is missing. It is located in
**@kharekartik:** Also can you specify the spark version  
**@kharekartik:** Also the --class needs to be
**@kharekartik:** You can view the full guide here -  
**@gxm.monica:** Hi @kharekartik, thank you for your help. I don't need to use
Kafka in batch ingestion. Because I used
`` as the main class before,
and it seems to need to load static variable `SUBCOMMAND_MAP` which finally
caused the error in this question. Now I change my configuration by the full
guide you mentioned above. My new spark job command is like this: ```export
export HADOOP_VERSION=2.7.2U17-11 export HADOOP_GUAVA_VERSION=11.0.2 export
PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin cd
${PINOT_DISTRIBUTION_DIR} ${SPARK_HOME}/bin/spark-submit \ \--class \ \--master
"local[2]" \ \--deploy-mode client \ \--conf
\ \--conf
\ \--conf
log4j2.xml" \ \--conf
hdfs/pinot-hdfs-${PINOT_VERSION}-shaded.jar" \ local://
dependencies.jar \ -jobSpecFile
My new ingestions spec is like this: ```name: 'spark'
extraConfigs: stagingDir:  jobType: SegmentCreationAndTarPush inputDirURI: ''
includeFileNamePattern: 'glob:**/*.csv' outputDirURI: '' overwriteOutput: true
pinotFSSpecs: \- scheme: hdfs className:
org.apache.pinot.plugin.filesystem.HadoopPinotFS recordReaderSpec: dataFormat:
'csv' className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig' tableSpec:
tableName: 'transcript' pinotClusterSpecs: \- controllerURI: '' pushJobSpec:
pushAttempts: 2 pushRetryIntervalMillis: 1000``` My spark version is `2.4.5`
when I executed, I got an error like this: ```ERROR StatusLogger Unrecognized
format specifier [d] ERROR StatusLogger Unrecognized conversion specifier [d]
starting at position 16 in conversion pattern. ERROR StatusLogger Unrecognized
format specifier [thread] ERROR StatusLogger Unrecognized conversion specifier
[thread] starting at position 25 in conversion pattern. ERROR StatusLogger
Unrecognized format specifier [level] ERROR StatusLogger Unrecognized
conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger] ERROR StatusLogger
Unrecognized conversion specifier [logger] starting at position 47 in
conversion pattern. ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at
position 54 in conversion pattern. ERROR StatusLogger Unrecognized format
specifier [n] ERROR StatusLogger Unrecognized conversion specifier [n]
starting at position 56 in conversion pattern. ERROR StatusLogger
Reconfiguration failed: No configuration found for '70dea4e' at 'null' in
'null' Exception in thread "main" java.lang.NoSuchMethodException:[Ljava.lang.String;)
at java.lang.Class.getMethod( at
at $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)``` It seems like
that ``
don't have a `main` method?  
**@kharekartik:** Yes, that was a bug which is fixed now. Can you pull the
latest code from master and build it.  
**@kharekartik:** Or you can simply add the following to the class ``` public
static void main(String[] args) { PluginManager.get().init(); (new
CommandLine(new LaunchDataIngestionJobCommand())).execute(args); }```  
 **@achavan1:** @achavan1 has joined the channel  
 **@harish.bohara:** @harish.bohara has joined the channel  
 **@mikesheppard2:** @mikesheppard2 has joined the channel  
 **@harish.bohara:** Hi.. if I have a large number of segments (for realtime
tables). Is there a setting which merges segment at background? Or Any cron
**@mayanks:** Yep minion tasks  
**@harish.bohara:** It seems it is only for offline tables:  The Minion
merge/rollup task allows a user to *merge small segments into larger ones,
through which Pinot can potentially benefit from improved disk storage and the
query performance*. For complete motivation and reasoning, please refer to the
design doc above. Currently, we only support *OFFLINE table APPEND use cases*.  
**@mayanks:** You can use this in conjunction with managed offline flow  
**@harish.bohara:** :+1:  

###  _#random_

 **@achavan1:** @achavan1 has joined the channel  
 **@harish.bohara:** @harish.bohara has joined the channel  
 **@mikesheppard2:** @mikesheppard2 has joined the channel  

###  _#feat-presto-connector_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#feat-upsert_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#fraud_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#troubleshooting_

 **@achavan1:** @achavan1 has joined the channel  
 **@nrajendra434:** :wave: hi folks, Looking to understand how to get the
pinot ingestion job working on EMR spark 2.4 in cluster mode. Using pinot
0.7.1 since the EMR cluster I'm working with is running on java 8. The
following spark-submit works successfully and the pinot segments are getting
generated when running in client mode. Here the command to start it on the
master node ```sudo spark-submit --class --master
local --deploy-mode client --conf spark.local.dir=/mnt --conf
incubating-0.7.1-bin/plugins -Dplugins.include=pinot-s3,pinot-parquet
incubating-0.7.1-bin/conf/pinot-ingestion-job-log4j2.xml" --conf
parquet-0.7.1-shaded.jar" /mnt/pinot/apache-pinot-
-jobSpecFile /mnt/pinot/spark_job_spec_v8.yaml``` the ingestion spec used is
this: ```executionFrameworkSpec: name: 'spark'
extraConfigs: stagingDir:  dependencyJarDir: '' jobType: SegmentCreation
inputDirURI: '' includeFileNamePattern: 'glob:**/*.parquet' outputDirURI: ''
overwriteOutput: true pinotFSSpecs: \- className:
org.apache.pinot.plugin.filesystem.S3PinotFS scheme: s3 configs: region: us-
east-1 recordReaderSpec: dataFormat: 'parquet' className:
'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader' tableSpec:
tableName: 'students' schemaURI: '' tableConfigURI: ''``` But when running
this on cluster mode, I get the class not found issue. The plugins.dir is
available on all the EMR nodes, and we can see that the plugins are getting
successfully loaded., I have tried passing the the s3 location as well as the
/mnt path, and both are failing with the same error. I looked at these two
previous posts  and  and they did not help in resolving it. Here is the error
```22/04/14 07:06:44 INFO PluginManager: Plugins root dir is
[/mnt/pinot/apache-pinot-incubating-0.7.1-bin/plugins] 22/04/14 07:06:44 INFO
PluginManager: Trying to load plugins: [[pinot-s3, pinot-parquet]] 22/04/14
07:06:44 INFO PluginManager: Trying to load plugin [pinot-s3] from location
system/pinot-s3] 22/04/14 07:06:44 INFO PluginManager: Successfully loaded
plugin [pinot-s3] from jar file [/mnt/pinot/apache-pinot-
system/pinot-s3/pinot-s3-0.7.1-shaded.jar] 22/04/14 07:06:44 INFO
PluginManager: Successfully Loaded plugin [pinot-s3] from dir
system/pinot-s3] 22/04/14 07:06:44 INFO PluginManager: Trying to load plugin
[pinot-parquet] from location [/mnt/pinot/apache-pinot-
incubating-0.7.1-bin/plugins/pinot-input-format/pinot-parquet] 22/04/14
07:06:44 INFO PluginManager: Successfully loaded plugin [pinot-parquet] from
jar file [/mnt/pinot/apache-pinot-incubating-0.7.1-bin/plugins/pinot-input-
format/pinot-parquet/pinot-parquet-0.7.1-shaded.jar] 22/04/14 07:06:44 INFO
PluginManager: Successfully Loaded plugin [pinot-parquet] from dir
format/pinot-parquet] 22/04/14 07:06:45 ERROR LaunchDataIngestionJobCommand:
Got exception to generate IngestionJobSpec for data ingestion job - Can't
construct a java object for
exception=Class not found:
org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec in
'string', line 1, column 1: executionFrameworkSpec: ^``` Will thread the
different commands used to submit this job. Thank you for your help :bow:  
**@nrajendra434:** this command with the jars pointing to lcoal path ```sudo
spark-submit --class --deploy-
mode cluster --jars /mnt/pinot/apache-pinot-incubating-0.7.1-bin/lib/pinot-
parquet-0.7.1-shaded.jar --files "/mnt/pinot/spark_job_spec_v8.yaml" --conf
incubating-0.7.1-bin/plugins -Dplugins.include=pinot-s3,pinot-parquet
incubating-0.7.1-bin/conf/pinot-ingestion-job-log4j2.xml" --conf
parquet-0.7.1-shaded.jar" --conf
parquet-0.7.1-shaded.jar"  -jobSpecFile spark_job_spec_v8.yaml``` and this
command point to jars on s3 ```sudo spark-submit --class --deploy-
mode cluster --jars  \--files "/mnt/pinot/spark_job_spec_v8.yaml" --conf
incubating-0.7.1-bin/plugins -Dplugins.include=pinot-s3,pinot-parquet
-Dlog4j2.configurationFile=" --conf "spark.driver.extraClassPath=" --conf
"spark.executor.extraClassPath="  -jobSpecFile spark_job_spec_v8.yaml``` both
are failing  
**@nrajendra434:** i tried including the jars in both driver.extraClassPath
and executor.extraClassPath. neither helped  
 **@harish.bohara:** @harish.bohara has joined the channel  
 **@mikesheppard2:** @mikesheppard2 has joined the channel  

###  _#onboarding_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#aggregators_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#community_

 **@glenn393:** @glenn393 has joined the channel  

###  _#presto-pinot-connector_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#pinot-perf-tuning_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#thirdeye-pinot_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#getting-started_

 **@achavan1:** @achavan1 has joined the channel  
 **@harish.bohara:** @harish.bohara has joined the channel  
 **@mikesheppard2:** @mikesheppard2 has joined the channel  

###  _#pinot-docsrus_

 **@mitchellh:** @mitchellh has joined the channel  

###  _#pinot-trino_

 **@mitchellh:** @mitchellh has joined the channel  
\--------------------------------------------------------------------- To
unsubscribe, e-mail: For additional commands,