You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by Pinot Slack Email Digest <sn...@apache.org> on 2021/05/05 02:00:21 UTC

Apache Pinot Daily Email Digest (2021-05-04)

### _#general_

  
 **@huytc94:** @huytc94 has joined the channel  
 **@kartikaybagla:** @kartikaybagla has joined the channel  
 **@pedro.cls93:** Hello, are Pinot helm charts designed to define sensitive
information such as credentials for deep storage in configMaps?  
**@pedro.cls93:** Say I want to configure an S3 bucket for deep storage as
specified in:  Is there any way to define bucket credentials in Secrets in
order to not expose them in clear text through the ?  
**@dlavoie:** As of today, pinot properties are exclusively built from
configMap. Merging Pinot Properties between confimap and secrets would be a
nice addition to the helm chart. Feel free to contribute a PR if you feel like
it. Happy to provide implementation guidelines and reviews.  
**@dlavoie:** I’m also trying to motivate the Pinot community to introduce
Secret as a first class SPI into Pinot.  
**@dlavoie:** But that’s a long term effort :slightly_smiling_face:  
**@pedro.cls93:** Thank you for the information. What is SPI?  
**@pedro.cls93:** From what I know of Pinot, simply adding secret support is
not enough. Pinot needs to support configuration through environment variables
for secrets + configmaps to work if I understand correctly.  
**@dlavoie:** All pinot is support for now is properties. Which we can
generate on pod provisioning by merging the existing configMap with a k8s
secret.  
**@dlavoie:** Some specific deep store providers supports Env variable, but
that is managed by the underlying client lib, not Pinot. S3 for example.  
**@dlavoie:** > What is SPI? It `Service Provider Interface`. Anything that is
pluggable in Pinot is managed through predefined interfaces. We call those
SPI.  
**@pedro.cls93:** “All pinot is support for now is properties. Which we can
generate on pod provisioning by merging the existing configMap with a k8s
secret.” Is there any existing configuration that is generated this way from
which I could get some inspiration?  
**@dlavoie:** Actually, what I had in mind wouldn’t work. The secrets would
endup in the configfile anyway  
**@dlavoie:** What we would need is phase 2 of this refactor:  
**@dlavoie:** It would allow loading environment variables as Pinot properties  
**@dlavoie:** Hence we could configure secrets are environment variables  
**@pedro.cls93:** Looks exactly like what I was looking for. Is phase2 of this
a work in progress that I can follow?  
**@dlavoie:** Unfortunately this is not something I personally planned on
working. If you feel like contributing to Pinot, once again, I’ll be happy to
provide you pointer for phase 2.  
 **@riya.tyagi:** @riya.tyagi has joined the channel  
 **@jmeyer:** Hello What is the recommended (prod) way of ingesting batch data
without Hadoop ? I'm thinking about having a Python component generate parquet
files + copy on deepstore, and triggering an ingestion Something like the
`/ingestFromFile` API endpoint but prod-compatible (where can segment creation
be done in that case ? Minion ?) Thanks !  
**@ken:** I’m guessing Minion would work well for that use case, but I haven’t
tried that. We just run shell scripts to trigger the segment generation job,
which uses HDFS for input (csv) and output (segment) directories. Then a
script executes a “URI push” job, which will use hdfs URIs to do a more
efficient load of segments. Though you need to set up and use controller &
server config files to configure hdfs as a valid file system for URIs.  
**@jmeyer:** I see, thanks for the feedback @ken I'll have a look at using the
minion, otherwise we'll use a shell script as you did :slightly_smiling_face:  
**@mayanks:** We are working on a solution where Minion can do the ingestion,
but not ready yet. cc: @jackie.jxt  
 **@joshhighley:** in 'normal' sql queries, I can use an aggregate function
with * to select all columns: ```select sum(a+b), * from my_table ``` pinot
query browser gives an error when I try this -- is there another way without
specifically listing all the columns?  
**@jackie.jxt:** Currently pinot doesn't support mixing wildcard with other
columns. Can you please create a github issue for this?  
**@jackie.jxt:** Contributions are very welcome  
**@fx19880617:** this is not a valid sql though, unless you group by all the
columns(*)  
**@jackie.jxt:** Yes, `select a + b, * from myTable` is valid  
 **@mustafa:** @mustafa has joined the channel  
 **@avasudevan:** @avasudevan has joined the channel  
 **@mustafa:** Hi! Is there a way to have the data streamed from Kafka and
then put into S3 *in Parquet format?*  
**@mayanks:** You mean using Pinot? No  
**@mayanks:** Using Pinot you can consume via Kafka and store in S3, but it
will be Pinot index format.  
**@mustafa:** Got it, thanks! Is there any other service that you know that
can handle this on a very big scale?  
**@mayanks:** Usually done via ETL pipelines (as you may need transforms on
Kafka topic before storing). Depending on your specific requirements there may
be standard solutions out there you can try.  
**@g.kishore:** @mustafa Gobblin can do that  
**@g.kishore:** there are many projects that can move data from Kafka to S3,
Kafka Connect as well  
**@mustafa:** thanks @g.kishore, I'll check it out. I have identified ,
@mayanks pointed out . I'll review them and pick according to our needs,
thanks a lot  

###  _#random_

  
 **@huytc94:** @huytc94 has joined the channel  
 **@kartikaybagla:** @kartikaybagla has joined the channel  
 **@riya.tyagi:** @riya.tyagi has joined the channel  
 **@mustafa:** @mustafa has joined the channel  
 **@avasudevan:** @avasudevan has joined the channel  

###  _#troubleshooting_

  
 **@huytc94:** @huytc94 has joined the channel  
 **@kartikaybagla:** @kartikaybagla has joined the channel  
 **@riya.tyagi:** @riya.tyagi has joined the channel  
 **@mustafa:** @mustafa has joined the channel  
 **@avasudevan:** @avasudevan has joined the channel  

###  _#thirdeye-pinot_

  
 **@kartikaybagla:** @kartikaybagla has joined the channel  

###  _#minion-improvements_

  
 **@laxman:** Able to reproduce the NPE issue with minor modifications to
existing test ```diff --git a/pinot-
core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
b/pinot-
core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
index 66fd61aedb..1c3d70ad7f 100644 \--- a/pinot-
core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
+++ b/pinot-
core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
@@ -87,7 +87,7 @@ public class SegmentProcessingFrameworkTest { private final
List<Object[]> _multiValue = Lists .newArrayList(new Object[]{new
String[]{"a", "b"}, 1000, 1597795200000L}, new Object[]{new String[]{"a"},
1000, 1597795200000L}, new Object[]{new String[]{"a"}, 1000, 1597795200000L},
\- new Object[]{new String[]{"a", "b"}, 1000, 1597795200000L}); \+ new
Object[]{new String[]{"a", "b"}, 1000, 1597795200000L}, new Object[]{null,
1000, 1597795200000L}); @BeforeClass public void setup() @@ -98,7 +98,7 @@
public class SegmentProcessingFrameworkTest {
.addSingleValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", ) .addDateTime("timeValue",
FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
_pinotSchemaMV = new Schema.SchemaBuilder().setSchemaName("mySchema") \-
.addMultiValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", ) \+
.addMultiValueDimension("campaign", FieldSpec.DataType.STRING,
"").addMetric("clicks", ) .addDateTime("timeValue", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); _baseDir = new
File(FileUtils.getTempDirectory(), "segment_processor_framework_test_" +
System.currentTimeMillis());```  
 **@laxman:** Please note that above diff generated on 0.7.1 release.  
 **@laxman:** Root cause: I think the issue is with a multi value dimension
with a defaultNullValue causing the reducer to throw NPE  
 **@laxman:** @jackie.jxt @npawar @fx19880617: I can sense its a minor issue
but not able to pin point the buggy code. Segment mapper is reading the
segments using the NoOpTransformer. Should it be stacked on top of default
CompositeTransformer?  
 **@laxman:** I actually patched and tested this. In SegmentMapper, used
default transformer before applying configured transformer(NoOpTransformer in
this case). But it didn’t work.  
 **@npawar:** Thanks for the test! I'll take a look  
 **@laxman:** Any hints on how to fix this? I can try and test now.  
 **@laxman:** Able to fix the issue with the following patch  
 **@laxman:**  
 **@laxman:** Can you please review once. Somehow I’m not fully convinced with
the fix I did. But it works. I used null transformer in reducer. Though the
fixed the issue, I’m somehow not feeling that’s the right fix.  
 **@laxman:** cc: @fx19880617 @jackie.jxt @npawar  
 **@jackie.jxt:** I tried the test, and seems it doesn't work only when the
default value is set to empty string. I feel the problem is in the record
collector where empty string is ignored  
 **@laxman:** I don’t see collector as the problem. Please check the above
patch.  
 **@laxman:** I will explain why I arrived at this NullTransformer approach  
**@laxman:** Mapper flow ========= 1\. input: generic row (from segment file)
- Nulls are taken care here while reading from segment file. Nulls are
converted to default values as defined in pinot schmea. 2\. output: generic
avro record (Nulls of an array are converted to default value) Reducer flow
========= 1\. input: generice avro (from mapper output file) - Nulls are
nulls. 2\. output: generic avro (Attempt to write null for an array is not
accepted for Avro DataFileWriter)  
**@jackie.jxt:** I understand why adding a NullTransformer works, but I need
to understand why the collector returns `null` for the field with default
value  
 **@jackie.jxt:** Also, why it has different behavior on empty string default
value and non-empty default value. They should have the same behavior  
 **@jackie.jxt:** What I'm saying is if you revert this, then the collector is
able to set the value, which doesn't seem correct to me: ```-
.addMultiValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", ) \+
.addMultiValueDimension("campaign", FieldSpec.DataType.STRING,
"").addMetric("clicks", )```  
**@laxman:** If I revert this, I can’t set a null value.  
**@laxman:** Collector I feel is processing correctly here. Also, btw, in my
test env, I’m having issue with Concat collector.  
**@jackie.jxt:** I tried reverting this one, and the NPE is gone  
**@laxman:** Here are the actuals  
**@laxman:** Without default value. Without Null value transformer.
```04:05:58.715 SegmentReducer - Reducer: pinot record: { "fieldToValueMap" :
{ "campaign" : [ "null" ], "clicks" : 1000, "timeValue" : 1597795200000 },
"nullValueFields" : [ ] } 04:05:58.715 SegmentReducer - Reducer: avro record:
{"campaign": ["null"], "clicks": 1000, "timeValue": 1597795200000}```  
**@laxman:** “null” as a string!!! How can the downstream even differentiate
between valid string array vs `null` as a string after this conversion?  
**@laxman:** With “” as defaultNullValue. With Null value transformer patch
```04:11:51.784 SegmentReducer - ================================ 04:11:51.785
SegmentReducer - Reducer: pinot record: { "fieldToValueMap" : { "campaign" : [
"" ], "clicks" : 1000, "timeValue" : 1597795200000 }, "nullValueFields" : [
"campaign" ] } 04:11:51.785 SegmentReducer - Reducer: avro record:
{"campaign": [""], "clicks": 1000, "timeValue": 1597795200000}```  
**@laxman:** Please notice the difference in `"nullValueFields"`  
 **@jackie.jxt:** @npawar ^^  
 **@laxman:** Mapper flow ========= 1\. input: generic row (from segment file)
- Nulls are taken care here while reading from segment file. Nulls are
converted to default values as defined in pinot schmea. 2\. output: generic
avro record (Nulls of an array are converted to default value) Reducer flow
========= 1\. input: generice avro (from mapper output file) - Nulls are
nulls. 2\. output: generic avro (Attempt to write null for an array is not
accepted for Avro DataFileWriter)  
**@laxman:** @jackie.jxt: is there any side affect or functional issue you see
using this NullValueTransformer as I did in the patch?  
 **@laxman:** I want to push this to our test setup and try if there are no
known issues  
 **@jackie.jxt:** My concern is on the inconsistent behavior on empty and non-
empty default value handling. Ideally we should not need NullValueTransformer
because all the transform should already be done in the mapper phase.  
 **@jackie.jxt:** I'm planning to enhance this part of the code and get rid of
avro intermediate file. @laxman If this is not urgent, can you please wait for
2-3 weeks?  
**@laxman:** @jackie.jxt: I can wait for the enhancement. But is it possible
to fix the functional issue first quickly. I mean, NPE alone. If this is
fixed, we will be unblocked and be able to validate several other things in
this conversion flow and purger flow.  
 **@jackie.jxt:** Once I have the PR ready, I can post it here for you to test
it out  
 **@npawar:** sorry i’m a lil caught up with some other things today. will try
to look in the evening  
 **@laxman:** @jackie.jxt: I can wait for the enhancement. But is it possible
to fix the functional issue first quickly. I mean, NPE alone. If this is
fixed, we will be unblocked and be able to validate several other things in
this conversion flow and purger flow.  
 **@jackie.jxt:** Sure, will take a look soon  

###  _#complex-type-support_

  
 **@jackie.jxt:** Support for nested (multi-dimensional) array:  
**@jackie.jxt:** @steotia @amrish.k.lal Please take a look  

###  _#fix_llc_segment_upload_

  
 **@changliu:** @changliu has joined the channel  
 **@ssubrama:** @ssubrama has joined the channel  
 **@tingchen:** @tingchen has joined the channel  
 **@changliu:** Hi @ssubrama @tingchen, just want to have a quick discussion
about this issue: how to . I think I will combine the suggestions from both of
you: Maintain an in-memory Map (key: table name, value: list of LLC segment
name to be fixed). A queue may not work properly since the fix is triggered
based on table level. When the key (table name) appears in the map, avoid ZK
access, try to fix the the segment in the map entry’s value list. If not
appear in the map, get the ZK metadata. One corner case is that some segments
failed multiple times so that it maintains in the map and never gets deleted.
To avoid such issue, we can keep a counter of how many rounds a table has been
tried to fix since last ZK access. This counter may be also in a format of map
(key: table name; value: number of rounds passed since last ZK access for this
table). If several rounds have been passed, we will need to fetch the ZK
metadata again, regardless if any segment left in the segment Map for the
table.
\----------------------------------------------------------------------------------------------------------------------------
The above implementation might be over complicated. I think probably we can
just build a counter map to count how many rounds it passed since the the last
ZK access per table level? i.e. if the threshold is 10, get the segment list
from ZK in round 1 and try to fix, the 9 remaining rounds just ignore this
table.
\----------------------------------------------------------------------------------------------------------------------------
Another option is to add the segment to the in-memory map during committing
phase, but in that case we may encounter issues that the segments failed to be
added to the map and it never gets fixed. In addition, the above issues can
also appear (failed multiple times so the segment name remains in map
forever).  
 **@ssubrama:** Create a cache of segment names in the controller. When we add
or remove a segment, we will update the cache after a successful operation. We
use the cache to get the segment names. If the cache is empty, then we will
fetch from zk for that table, on demand.  
 **@ssubrama:** We can add this cache later. For now, make the whole thing
configurable so that only if deepstore bypass is used, we do this.  
 **@changliu:** Yes I agree we need to add some cache (e.g. an in memory map
for each table as separate entry). But another issue is that if a segment
fails to be fixed every time, it’s stuck in cache. As a result, the controller
skip ZK access for that table from that moment on because the cache is never
purged. So I think we may need some logic to be aware of this situation, e.g.
a counter may help: after certain number of rounds, we need to access ZK for
that table regardless if any cache exists.  
 **@changliu:** Also for the tables with all the segments stored in deep
storage, the cache is empty. So basically controller will try to fetch from ZK
for these tables in every round of fix. I think a counter may also help in
this situation. e.g. we only fetch the ZK metadata after x rounds even if the
cache is empty for a table  
\--------------------------------------------------------------------- To
unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org For additional commands,
e-mail: dev-help@pinot.apache.org