You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Charles Givre <cg...@gmail.com> on 2020/03/24 18:46:40 UTC

Excessive Memory Use in Parquet Files (From Drill Slack Channel)

Idan Sheinberg  8:21 AM
Hi  there
I'm trying run a simple offset query (ORDER BY timestamp LIMIT 500 OFFSET 1000) against rather complex parquet files (say 4 columns, once being an array currently consisting of a single element comprised of 15 columns)
All files share the same Schema, of course.
 User Error Occurred: One or more nodes ran out of memory while executing the query. (null)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more nodes ran out of memory while executing the query.
null
[Error Id: 67b61fc9-320f-47a1-8718-813843a10ecc ]
	at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:657)
	at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:338)
	at org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: null
	at org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew(AbstractContainerVector.java:59)
	at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.allocateOutgoingRecordBatch(PartitionerTemplate.java:380)
	at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.initializeBatch(PartitionerTemplate.java:400)
	at org.apache.drill.exec.test.generated.PartitionerGen5.setup(PartitionerTemplate.java:126)
	at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createClassInstances(PartitionSenderRootExec.java:263)
	at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createPartitioner(PartitionSenderRootExec.java:218)
	at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext(PartitionSenderRootExec.java:188)
	at org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93)
	at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:323)
	at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:310)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:310)
	... 4 common frames omitted
Now, I'm running this query from a 16 core, 32GB Ram machine, with Heap sized at 20GB, Eden sized at 16GB (added manually to JAVA_OPTS) and Direct Sized at 8 GB.
By querying sys.memory I can confirm all limits apply. At no point throughout the query Am I nearing memory limit of the HEAP/DIRECT or the OS itself





8:25
However, due to the way org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew is impelmented
8:27
@Override
  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }
8:27
The actual exception/error is swallowed, and I have no idea what's the cause of the failure
8:28
The data-set itself consists of say 15 parquet files, each one weighing at about 100kb
8:30
but as mentioned earlier, the parquet files are a bit more complex than the usual.
8:32
@cgivre @Vova Vysotskyi is there anything I can do or tweak to make this error go away?

cgivre  8:40 AM
Hmm...
8:40
This may be a bug.  Can you create an issue on our JIRA board?

Idan Sheinberg  8:43 AM
Sure
8:43
I'll get to it

cgivre  8:44 AM
I'd like for Paul Rogers to see this as I think he was the author of some of this.

Idan Sheinberg  8:44 AM
Hmm. I'll keep that in mind

cgivre  8:47 AM
We've been refactoring some of the complex readers as well, so its possible that is caused this, but I'm not really sure.
8:47
What version of Drill?

cgivre  9:11 AM
This kind of info is super helpful as we're trying to work out all these details.
9:11
Reading schemas on the fly is not trivial, so when we find issues, we do like to resolve them

Idan Sheinberg  9:16 AM
This is drill 0.18 -SNAPSHOT as of last month
9:16
Ummmm
9:16
I do think I managed to resolve the issue however
9:16
I'm going to run some additional tests and let you know

cgivre  9:16 AM
What did you do?
9:17
You might want to rebase with today's build as well

Idan Sheinberg  9:21 AM
I'll come back with the details in a few moments

cgivre  9:38 AM
Thx
new messages

Idan Sheinberg  9:50 AM
Ok. See it seems as though it's a combination of a few things.
The data-set in question is still small (as mentioned before), but we are setting planner.slice_target  to an extremely low value in order to trigger parallelism and speed up parquet parsing by using multiple fragments.
We have 16 cores, 32 GB (C5.4xlarge on AWS) but we set planner.width.max_per_node  to further increase parallelism.  it seems as though each fragment is handling parquet parsing on it's own, and somehow incurs a great burden on
the direct memory buffer pool, as I do see 16GB peaks of direct memory usage after lowering the planner.width.max_per_node to 16 (our available core).
The query planner itself reports the HASH_PARTITION_SENDER as the largest phase with 1-2 GB of memory utilization
Seeing such an impact (16 GB of direct memory) for 1K items spread across 15 files, even with a very complex parquet schema, seems unreasonable to me

Re: Excessive Memory Use in Parquet Files (From Drill Slack Channel)

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Charles,
Thanks for forwarding this. Looks like Idan found the right answer. Still, I repeated the analysis and have some suggestions.

Looked at the code mentioned in the message chain. This is a place where our error handling could use work:

  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }

Some default allocation failed, but we preserve none of the relevant information: nothing about the kind of vector, nothing about the cause of the failure. There are dozens of implementations of allocateNewSafe(); it is impossible to determine which was called.

A typical implementation:
  public boolean allocateNewSafe() {
    long curAllocationSize = ...
    try{
      allocateBytes(curAllocationSize);
    } catch (DrillRuntimeException ex) {
      return false;
    }
    return true;
  }


We catch the exception, then ignore it. Sign... We can fix all this, but it does not help with this specific issue. See DRILL-7658.

As it turns out, most of the implementations are in the generated vector classes. These classes, oddly, have their own redundant copy of allocateNewSafe(). Since we don't see those methods on the stack, we can quickly narrow down the candidates to:

* AbstractMapVector (any map vector)
* A few others that won't occur for Parquet


Given this, it means the allocation is failing when allocating a map. Idan mentions "one column is an array of a single element comprised of 15 columns". We can presume that the "element" is actually a map, and that the map has 15 columns.

So, looks like the map allocation failed during a partition sender (the next element on the stack). The partition sender takes incoming batches (presumably from he scan, though the stack trace does not say because were at the root of the DAG), and splits them by key to destination nodes.

Idan mentions the query runs on a single machine. So, the partitions are only to threads on that same machine. Idan mentions a 16-core machine. Since Drill parallelizes queries to 70% of the cores, we may be running 11 threads, so each partition sender tries to buffer data for 11 receivers. Each will buffer three batches of data for a total of 33 batches.

Next we need to know how many records are in each batch. Seems we have two default values, defined in drill-override.conf:


    store.parquet.flat.batch.num_records: 32767,
    store.parquet.complex.batch.num_records: 4000,


If we think the record has a map, then perhaps Parquet choose the "complex" count of 4000 records? I think this can be checked by looking at the query profile which, if I recall, should be produced even for a failed query.

So, let's guess 4000 records * 33 buffered batches = 130K records. We don't know the size of each, however. (And, note that Idan said that he artificially increased parallelism, so the buffering need is greater than the above back-of-the-envelope calcs.)


We do know the size of the data: 15 files of 150K each. Let's assume that is compressed. So, if all files are in memory, that would be 15 * 150K * 10:1 compression ratio = 22 MB, which is tiny. So, it is unlikely that, Drill is actually buffering all 33 batches. This tells us that something else is going wrong; we are not actually running out memory for data, just as Idan suggested, we are exhausting memory for some other reason.


Reading further it looks like Idan found his own solution. He increased parallelism to the point where the internal buffering of each Parquet reader used up all available memory. This is probably a bug, but Parquet is a a fiendishly complex beast. Over time, people threw all kinds of parallel readers, buffering and other things at it to beat Impala in TPC benchmarks.

Since a query that finishes is faster than a highly-tuned query that crashes, I'd recommend throttling the slice count back. You really only need as many as there are cores. In fact, you need less. Unlike other readers, Parquet launches a bunch of its own parallel readers so each single Parquet reader will have many (I don't recall the number) of parallel column readers, each aggressively buffering everything it can.

Since the data is small, there is no need for such heroics: Drill can read 20+ meg of data quite quickly, even with a few threads. So, try that first and see if that works.

Once the query works, study the query profile to determine the memory budget and CPU usage. Tune from there, keeping memory well within the available bounds.


Thanks,
- Paul

 

    On Tuesday, March 24, 2020, 11:46:47 AM PDT, Charles Givre <cg...@gmail.com> wrote:  
 
 
Idan Sheinberg  8:21 AM
Hi  there
I'm trying run a simple offset query (ORDER BY timestamp LIMIT 500 OFFSET 1000) against rather complex parquet files (say 4 columns, once being an array currently consisting of a single element comprised of 15 columns)
All files share the same Schema, of course.
 User Error Occurred: One or more nodes ran out of memory while executing the query. (null)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more nodes ran out of memory while executing the query.
null
[Error Id: 67b61fc9-320f-47a1-8718-813843a10ecc ]
    at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:657)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:338)
    at org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: null
    at org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew(AbstractContainerVector.java:59)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.allocateOutgoingRecordBatch(PartitionerTemplate.java:380)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.initializeBatch(PartitionerTemplate.java:400)
    at org.apache.drill.exec.test.generated.PartitionerGen5.setup(PartitionerTemplate.java:126)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createClassInstances(PartitionSenderRootExec.java:263)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createPartitioner(PartitionSenderRootExec.java:218)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext(PartitionSenderRootExec.java:188)
    at org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:323)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:310)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:310)
    ... 4 common frames omitted
Now, I'm running this query from a 16 core, 32GB Ram machine, with Heap sized at 20GB, Eden sized at 16GB (added manually to JAVA_OPTS) and Direct Sized at 8 GB.
By querying sys.memory I can confirm all limits apply. At no point throughout the query Am I nearing memory limit of the HEAP/DIRECT or the OS itself





8:25
However, due to the way org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew is impelmented
8:27
@Override
  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }
8:27
The actual exception/error is swallowed, and I have no idea what's the cause of the failure
8:28
The data-set itself consists of say 15 parquet files, each one weighing at about 100kb
8:30
but as mentioned earlier, the parquet files are a bit more complex than the usual.
8:32
@cgivre @Vova Vysotskyi is there anything I can do or tweak to make this error go away?

cgivre  8:40 AM
Hmm...
8:40
This may be a bug.  Can you create an issue on our JIRA board?

Idan Sheinberg  8:43 AM
Sure
8:43
I'll get to it

cgivre  8:44 AM
I'd like for Paul Rogers to see this as I think he was the author of some of this.

Idan Sheinberg  8:44 AM
Hmm. I'll keep that in mind

cgivre  8:47 AM
We've been refactoring some of the complex readers as well, so its possible that is caused this, but I'm not really sure.
8:47
What version of Drill?

cgivre  9:11 AM
This kind of info is super helpful as we're trying to work out all these details.
9:11
Reading schemas on the fly is not trivial, so when we find issues, we do like to resolve them

Idan Sheinberg  9:16 AM
This is drill 0.18 -SNAPSHOT as of last month
9:16
Ummmm
9:16
I do think I managed to resolve the issue however
9:16
I'm going to run some additional tests and let you know

cgivre  9:16 AM
What did you do?
9:17
You might want to rebase with today's build as well

Idan Sheinberg  9:21 AM
I'll come back with the details in a few moments

cgivre  9:38 AM
Thx
new messages

Idan Sheinberg  9:50 AM
Ok. See it seems as though it's a combination of a few things.
The data-set in question is still small (as mentioned before), but we are setting planner.slice_target  to an extremely low value in order to trigger parallelism and speed up parquet parsing by using multiple fragments.
We have 16 cores, 32 GB (C5.4xlarge on AWS) but we set planner.width.max_per_node  to further increase parallelism.  it seems as though each fragment is handling parquet parsing on it's own, and somehow incurs a great burden on
the direct memory buffer pool, as I do see 16GB peaks of direct memory usage after lowering the planner.width.max_per_node to 16 (our available core).
The query planner itself reports the HASH_PARTITION_SENDER as the largest phase with 1-2 GB of memory utilization
Seeing such an impact (16 GB of direct memory) for 1K items spread across 15 files, even with a very complex parquet schema, seems unreasonable to me  

Re: Excessive Memory Use in Parquet Files (From Drill Slack Channel)

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Charles,
Thanks for forwarding this. Looks like Idan found the right answer. Still, I repeated the analysis and have some suggestions.

Looked at the code mentioned in the message chain. This is a place where our error handling could use work:

  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }

Some default allocation failed, but we preserve none of the relevant information: nothing about the kind of vector, nothing about the cause of the failure. There are dozens of implementations of allocateNewSafe(); it is impossible to determine which was called.

A typical implementation:
  public boolean allocateNewSafe() {
    long curAllocationSize = ...
    try{
      allocateBytes(curAllocationSize);
    } catch (DrillRuntimeException ex) {
      return false;
    }
    return true;
  }


We catch the exception, then ignore it. Sign... We can fix all this, but it does not help with this specific issue. See DRILL-7658.

As it turns out, most of the implementations are in the generated vector classes. These classes, oddly, have their own redundant copy of allocateNewSafe(). Since we don't see those methods on the stack, we can quickly narrow down the candidates to:

* AbstractMapVector (any map vector)
* A few others that won't occur for Parquet


Given this, it means the allocation is failing when allocating a map. Idan mentions "one column is an array of a single element comprised of 15 columns". We can presume that the "element" is actually a map, and that the map has 15 columns.

So, looks like the map allocation failed during a partition sender (the next element on the stack). The partition sender takes incoming batches (presumably from he scan, though the stack trace does not say because were at the root of the DAG), and splits them by key to destination nodes.

Idan mentions the query runs on a single machine. So, the partitions are only to threads on that same machine. Idan mentions a 16-core machine. Since Drill parallelizes queries to 70% of the cores, we may be running 11 threads, so each partition sender tries to buffer data for 11 receivers. Each will buffer three batches of data for a total of 33 batches.

Next we need to know how many records are in each batch. Seems we have two default values, defined in drill-override.conf:


    store.parquet.flat.batch.num_records: 32767,
    store.parquet.complex.batch.num_records: 4000,


If we think the record has a map, then perhaps Parquet choose the "complex" count of 4000 records? I think this can be checked by looking at the query profile which, if I recall, should be produced even for a failed query.

So, let's guess 4000 records * 33 buffered batches = 130K records. We don't know the size of each, however. (And, note that Idan said that he artificially increased parallelism, so the buffering need is greater than the above back-of-the-envelope calcs.)


We do know the size of the data: 15 files of 150K each. Let's assume that is compressed. So, if all files are in memory, that would be 15 * 150K * 10:1 compression ratio = 22 MB, which is tiny. So, it is unlikely that, Drill is actually buffering all 33 batches. This tells us that something else is going wrong; we are not actually running out memory for data, just as Idan suggested, we are exhausting memory for some other reason.


Reading further it looks like Idan found his own solution. He increased parallelism to the point where the internal buffering of each Parquet reader used up all available memory. This is probably a bug, but Parquet is a a fiendishly complex beast. Over time, people threw all kinds of parallel readers, buffering and other things at it to beat Impala in TPC benchmarks.

Since a query that finishes is faster than a highly-tuned query that crashes, I'd recommend throttling the slice count back. You really only need as many as there are cores. In fact, you need less. Unlike other readers, Parquet launches a bunch of its own parallel readers so each single Parquet reader will have many (I don't recall the number) of parallel column readers, each aggressively buffering everything it can.

Since the data is small, there is no need for such heroics: Drill can read 20+ meg of data quite quickly, even with a few threads. So, try that first and see if that works.

Once the query works, study the query profile to determine the memory budget and CPU usage. Tune from there, keeping memory well within the available bounds.


Thanks,
- Paul

 

    On Tuesday, March 24, 2020, 11:46:47 AM PDT, Charles Givre <cg...@gmail.com> wrote:  
 
 
Idan Sheinberg  8:21 AM
Hi  there
I'm trying run a simple offset query (ORDER BY timestamp LIMIT 500 OFFSET 1000) against rather complex parquet files (say 4 columns, once being an array currently consisting of a single element comprised of 15 columns)
All files share the same Schema, of course.
 User Error Occurred: One or more nodes ran out of memory while executing the query. (null)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more nodes ran out of memory while executing the query.
null
[Error Id: 67b61fc9-320f-47a1-8718-813843a10ecc ]
    at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:657)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:338)
    at org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: null
    at org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew(AbstractContainerVector.java:59)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.allocateOutgoingRecordBatch(PartitionerTemplate.java:380)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.initializeBatch(PartitionerTemplate.java:400)
    at org.apache.drill.exec.test.generated.PartitionerGen5.setup(PartitionerTemplate.java:126)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createClassInstances(PartitionSenderRootExec.java:263)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createPartitioner(PartitionSenderRootExec.java:218)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext(PartitionSenderRootExec.java:188)
    at org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:323)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:310)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:310)
    ... 4 common frames omitted
Now, I'm running this query from a 16 core, 32GB Ram machine, with Heap sized at 20GB, Eden sized at 16GB (added manually to JAVA_OPTS) and Direct Sized at 8 GB.
By querying sys.memory I can confirm all limits apply. At no point throughout the query Am I nearing memory limit of the HEAP/DIRECT or the OS itself





8:25
However, due to the way org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew is impelmented
8:27
@Override
  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }
8:27
The actual exception/error is swallowed, and I have no idea what's the cause of the failure
8:28
The data-set itself consists of say 15 parquet files, each one weighing at about 100kb
8:30
but as mentioned earlier, the parquet files are a bit more complex than the usual.
8:32
@cgivre @Vova Vysotskyi is there anything I can do or tweak to make this error go away?

cgivre  8:40 AM
Hmm...
8:40
This may be a bug.  Can you create an issue on our JIRA board?

Idan Sheinberg  8:43 AM
Sure
8:43
I'll get to it

cgivre  8:44 AM
I'd like for Paul Rogers to see this as I think he was the author of some of this.

Idan Sheinberg  8:44 AM
Hmm. I'll keep that in mind

cgivre  8:47 AM
We've been refactoring some of the complex readers as well, so its possible that is caused this, but I'm not really sure.
8:47
What version of Drill?

cgivre  9:11 AM
This kind of info is super helpful as we're trying to work out all these details.
9:11
Reading schemas on the fly is not trivial, so when we find issues, we do like to resolve them

Idan Sheinberg  9:16 AM
This is drill 0.18 -SNAPSHOT as of last month
9:16
Ummmm
9:16
I do think I managed to resolve the issue however
9:16
I'm going to run some additional tests and let you know

cgivre  9:16 AM
What did you do?
9:17
You might want to rebase with today's build as well

Idan Sheinberg  9:21 AM
I'll come back with the details in a few moments

cgivre  9:38 AM
Thx
new messages

Idan Sheinberg  9:50 AM
Ok. See it seems as though it's a combination of a few things.
The data-set in question is still small (as mentioned before), but we are setting planner.slice_target  to an extremely low value in order to trigger parallelism and speed up parquet parsing by using multiple fragments.
We have 16 cores, 32 GB (C5.4xlarge on AWS) but we set planner.width.max_per_node  to further increase parallelism.  it seems as though each fragment is handling parquet parsing on it's own, and somehow incurs a great burden on
the direct memory buffer pool, as I do see 16GB peaks of direct memory usage after lowering the planner.width.max_per_node to 16 (our available core).
The query planner itself reports the HASH_PARTITION_SENDER as the largest phase with 1-2 GB of memory utilization
Seeing such an impact (16 GB of direct memory) for 1K items spread across 15 files, even with a very complex parquet schema, seems unreasonable to me