You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Mayur Srivastava <Ma...@twosigma.com> on 2021/11/30 20:33:47 UTC

High memory usage with highly concurrent committers

Hi Iceberg Community,

I'm running some experiments with high commit contention (on the same Iceberg table writing to different partitions) and I'm observing very high memory usage (5G to 7G). (Note that the data being written is very small.)

The scenario is described below:

Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven't tried the latest main branch.

Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough retries are done so that all committers finish successfully).

Steps:

* Create an Iceberg table with three columns: time (timestamp without timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and append the DataFile to the corresponding AppendFile. Basically, we create one parquet file per month (i.e. per partition) containing a single row. This is done to keep data size small for the experiment. Also, we ensure that each commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable which calls AppendFile.commit(), and get the Future. I.e. Run the commits in parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 64, 128). Code snippets can be found below.

Observations:
A. Total elapsed commit time increases with the number of committers which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed commit time is more than 250 s. This is acceptable given the nature of OCC in high concurrency.
B. The number of table metadata files is a multiple of the number of committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table metadata json files was 380. This is acceptable given the nature of OCC in high concurrency.
C. The memory usage which keeps shooting-up  periodically to 7G in some experiments. This is noticeable (i.e. memory usage > 1G) when number of concurrent committers >= 16 and becomes worse when number of committers increase. I've not investigated further but it could be that the in-memory metadata (snapshots, etc.) is growing very large. If I serialize the commit attempts (e.g. by acquiring a lock), the high memory usage problem goes away. But, I wanted to check here before trying out any alternative.

Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their writes (into non-overlapping partitions) through a data service that takes care of writing and committing the data. In many cases, they end up in the high commit contention scenario as described above. My main worry here is that this is happening for a single table, if we have multiple tables being committed, the memory usage will be much larger.

Questions:

1.      Have others observed this behavior? Is the high memory usage expected or am I doing something wrong? Is there any way to reduce the memory footprint (e.g. by changing some metadata config) during the commit?

2.      What is the general recommendation for high concurrent committers? Is high concurrent committers an anti-pattern for Iceberg?

Thanks,
Mayur
Code snippets:

Schema schema = new Schema(
    NestedField.of(1, false, "time", TimestampType.withoutZone()),
    NestedField.of(2, false, "id", IntegerType.get()),
    NestedField.of(3, false, "value", DoubleType.get())
);

catalog.createTable(
    tableIdentifier,
    schema,
    PartitionSpec.builderFor(schema).month("time").build(),
    ImmutableMap.of(
        TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
        TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
        TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
        "write.metadata.previous-versions-max", String.valueOf(1)
    ) // properties
);

// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    appendFilesList.add(table.newAppend());
}

for (int m = 0; m < NT; m++) {
    LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, ChronoUnit.MONTHS);
    ImmutableList<GenericRecord> records = ImmutableList.of(createRecord(schema, time, 1, 10.0));
    writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}

// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    final int i = m;
    futures.add(executors.submit(() -> {
        appendFilesList.get(i).commit();
    }));
}

for (int m = 0; m < N; m++) {
    futures.get(m).get();
}

executors.shutdownNow();

// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord> records)
        throws IOException {
    // PartitionedWriterImpl extends extends PartitionedWriter<Record>
    try (var writer = new PartitionedWriterImpl(table)) {
        for (var record : records) {
            writer.write(record);
        }
        return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
    }
}

Following is the heap usage for one of the experiments where we can see very high heap usage. The initial low usage part is data writes. The high heap usage starts with the commit phase.
[cid:image001.jpg@01D7E5FF.AA6361E0]

Re: High memory usage with highly concurrent committers

Posted by Ryan Blue <bl...@tabular.io>.
Igor, you might want to take a look at the recently added GCSFileIO:
https://github.com/apache/iceberg/pull/3711/files

It would be great to get some additional feedback on that!

On Tue, Dec 14, 2021 at 8:28 PM Igor Dvorzhak <id...@google.com.invalid>
wrote:

> Yes, you still can upload files of any size, this property just configures
> the amount of data that first cached in-memory and subsequently sent to GCS
> in one HTTP request during resumable upload
> <https://cloud.google.com/storage/docs/resumable-uploads> session.
>
> On Mon, Dec 6, 2021 at 4:50 AM Piotr Findeisen <pi...@starburstdata.com>
> wrote:
>
>> Hi
>>
>> Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I
>> can upload?
>> Can i upload e.g. 1GB Parquet file, while also setting
>> fs.gs.outputstream.upload.chunk.size=8388608 (8MB / MiB)?
>>
>> Best
>> PF
>>
>>
>> On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <id...@google.com.invalid>
>> wrote:
>>
>>> No, right now this is a global property for the Hadoop FS instance. You
>>> either need to use different clients/Hadoop FS instances to write different
>>> files or switch to the direct upload mode (
>>> fs.gs.outputstream.direct.upload.enable=true), which could be better
>>> for your use case (in this write mode nothing cached in the memory and
>>> streamed to GCS directly, but it does not allow failed upload retries),
>>> depending on the parquet file sizes that you write.
>>>
>>> Also, you may want to test how critical 64MiB buffer size is for your
>>> application, it may be the case that 16MiB, for example, will get you
>>> desired performance for parquet file writes and good enough memory
>>> consumption.
>>>
>>> But on a broader note this seems to be one of the reasons why it could
>>> be good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it
>>> can have separate write configuration optimized for metadata and data files.
>>>
>>> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
>>> Mayur.Srivastava@twosigma.com> wrote:
>>>
>>>> Thanks Igor. This may help mitigate the problem.
>>>>
>>>>
>>>>
>>>> But it looks like it applies to all files. We still want data (parquet)
>>>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
>>>> is better. Is there a way to set the property based on file suffix or file
>>>> type?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>>
>>>>
>>>> *From:* Igor Dvorzhak <id...@google.com.INVALID>
>>>> *Sent:* Thursday, December 2, 2021 8:09 PM
>>>> *To:* dev@iceberg.apache.org
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> For each written object GCS connector allocates ~64MiB of memory by
>>>> default to improve performance of large object writes. If you want to
>>>> reduce memory utilization in cases when you write many files at once you
>>>> just need to reduce upload chunk size to 8MiB, for example:
>>>> fs.gs.outputstream.upload.chunk.size=8388608
>>>>
>>>>
>>>>
>>>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
>>>> Mayur.Srivastava@twosigma.com> wrote:
>>>>
>>>> That is correct Daniel.
>>>>
>>>>
>>>>
>>>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
>>>> gs:// prefix …” thread.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>>
>>>>
>>>> *From:* Daniel Weeks <da...@gmail.com>
>>>> *Sent:* Wednesday, December 1, 2021 11:46 AM
>>>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> I feel like what Mayur was saying is that S3FileIO actually works with
>>>> GCS (it appears there is some S3 compatible API for GCS).
>>>>
>>>>
>>>>
>>>> If that is the case, then S3FileIO can be used natively against GCS,
>>>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI
>>>> schemes).
>>>>
>>>>
>>>>
>>>> This is new to me and I haven't tested this, but Mayur, if this does
>>>> work, please share how you configured S3FileIO.
>>>>
>>>>
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>> We are in the process of supporting multiple file system schemes using
>>>> ResolvingFileIO, Ryan just added the initial implementation:
>>>> https://github.com/apache/iceberg/pull/3593
>>>>
>>>>
>>>>
>>>> -Jack
>>>>
>>>>
>>>>
>>>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
>>>> Mayur.Srivastava@twosigma.com> wrote:
>>>>
>>>> Thanks Ryan.
>>>>
>>>>
>>>>
>>>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>>>> the following top two objects:
>>>>
>>>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total
>>>> of 3.2G in one of my tests). I checked some of the reference and find that
>>>> they are from
>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
>>>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>>>> coming from WriterBasedJsonGenerator, finalizer
>>>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>>>> but is it possible that Hadoop output streams are not closed and close is
>>>> called the finalizers?
>>>>
>>>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>>>> references.
>>>>
>>>>
>>>>
>>>> One interesting finding is that if I switch to the S3FileIO, the high
>>>> memory usage goes away and the memory usage is similar to the serialized
>>>> commits using a lock which is ~750 M for 128 parallel committers. And the
>>>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>>>
>>>>
>>>>
>>>> So, the high memory problem manifests only when using the default
>>>> HadoopFileSystem.
>>>>
>>>>
>>>>
>>>> Thanks, Mayur
>>>>
>>>>
>>>>
>>>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>>>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>>>
>>>>
>>>>
>>>> *From:* Ryan Blue <bl...@tabular.io>
>>>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>>>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> Mayur,
>>>>
>>>>
>>>>
>>>> Is it possible to connect to this process with a profiler and look at
>>>> what's taking up all of the space?
>>>>
>>>>
>>>>
>>>> I suspect that what's happening here is that you're loading the list of
>>>> snapshots for each version of metadata, so you're holding a lot of copies
>>>> of the entire snapshot history and possibly caching the list of manifests
>>>> for some snapshots as well.
>>>>
>>>>
>>>>
>>>> I've thought about adding a way to avoid parsing and loading snapshots,
>>>> probably by passing a cache when loading metadata so that all the copies of
>>>> a table can share snapshots in memory. That would work fine because they're
>>>> immutable. That might help you here, although a Snapshot instance will
>>>> cache manifests after loading them if they are accessed, so you'd want to
>>>> watch out for that as well.
>>>>
>>>>
>>>>
>>>> The best step forward is to get an idea of what objects are taking up
>>>> that space with a profiler or heap dump if you can.
>>>>
>>>>
>>>>
>>>> Ryan
>>>>
>>>>
>>>>
>>>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>>>> Mayur.Srivastava@twosigma.com> wrote:
>>>>
>>>> Hi Iceberg Community,
>>>>
>>>>
>>>>
>>>> I’m running some experiments with high commit contention (on the same
>>>> Iceberg table writing to different partitions) and I'm observing very high
>>>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>>>
>>>>
>>>>
>>>> *The scenario is described below:*
>>>>
>>>>
>>>>
>>>> *Note1: The catalog used is similar to the JDBC catalog.*
>>>>
>>>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk
>>>> to S3.*
>>>>
>>>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>>>> branch.*
>>>>
>>>>
>>>>
>>>> *Experiment params:*
>>>>
>>>> a. NT = 64 = number of parallel committers. Achieved using multiple
>>>> threads within the same process.
>>>>
>>>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>>>
>>>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>>>
>>>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>>>> enough retries are done so that all committers finish successfully).
>>>>
>>>>
>>>>
>>>> *Steps:*
>>>>
>>>>
>>>>
>>>> * *Create an Iceberg table *with three columns: time (timestamp
>>>> without timezone), id (int32), value (float64). The partition spec is
>>>> (time, MONTH).
>>>>
>>>> * Sequential step: create *NT different AppendFile* objects.
>>>>
>>>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique
>>>> month) and append the DataFile to the corresponding AppendFile. Basically,
>>>> we create one parquet file per month (i.e. per partition) containing a
>>>> single row. This is done to keep data size small for the experiment. Also,
>>>> we ensure that each commit will contain a different partition.
>>>>
>>>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>>>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e.
>>>> Run the commits in parallel.
>>>>
>>>> * Wait for all Futures to finish.
>>>>
>>>> I ran this experiment with various values for params. For example, I
>>>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>>>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>>>
>>>>
>>>>
>>>> *Observations:*
>>>>
>>>> A. Total elapsed commit time increases with the number of committers
>>>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>>>> elapsed commit time is more than 250 s. This is acceptable given the nature
>>>> of OCC in high concurrency.
>>>>
>>>> B. The number of table metadata files is a multiple of the number of
>>>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>>>> metadata json files was 380. This is acceptable given the nature of OCC in
>>>> high concurrency.
>>>>
>>>> *C. The memory usage which keeps shooting-up  periodically to 7G in
>>>> some experiments. This is noticeable (i.e. memory usage > 1G) when number
>>>> of concurrent committers >= 16 and becomes worse when number of committers
>>>> increase. I’ve not investigated further but it could be that the in-memory
>>>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>>>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>>>> away. But, I wanted to check here before trying out any alternative.*
>>>>
>>>>
>>>>
>>>> *Why is the concurrent commit important to us?*
>>>>
>>>> We have several users who use various processing engines to schedule
>>>> their writes (into non-overlapping partitions) through a data service that
>>>> takes care of writing and committing the data. In many cases, they end up
>>>> in the high commit contention scenario as described above. My main worry
>>>> here is that this is happening for a single table, if we have multiple
>>>> tables being committed, the memory usage will be much larger.
>>>>
>>>>
>>>>
>>>> *Questions:  *
>>>>
>>>> 1.      Have others observed this behavior? Is the high memory usage
>>>> expected or am I doing something wrong? Is there any way to reduce the
>>>> memory footprint (e.g. by changing some metadata config) during the commit?
>>>>
>>>> 2.      What is the general recommendation for high concurrent
>>>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>> *Code snippets:*
>>>>
>>>>
>>>>
>>>> Schema schema = new Schema(
>>>>
>>>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>>>
>>>>     NestedField.of(2, false, "id", IntegerType.get()),
>>>>
>>>>     NestedField.of(3, false, "value", DoubleType.get())
>>>>
>>>> );
>>>>
>>>>
>>>>
>>>> catalog.createTable(
>>>>
>>>>     tableIdentifier,
>>>>
>>>>     schema,
>>>>
>>>>     PartitionSpec.builderFor(schema).month("time").build(),
>>>>
>>>>     ImmutableMap.of(
>>>>
>>>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>>>
>>>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
>>>> String.valueOf(minWait),
>>>>
>>>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
>>>> String.valueOf(maxWait),
>>>>
>>>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>>>
>>>>     ) // properties
>>>>
>>>> );
>>>>
>>>>
>>>>
>>>> // Write data phase.
>>>>
>>>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     appendFilesList.add(table.newAppend());
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>>>> ChronoUnit.MONTHS);
>>>>
>>>>     ImmutableList<GenericRecord> records =
>>>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>>>
>>>>     writeRecords(table,
>>>> records).forEach(appendFilesList.get(m)::appendFile);
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> // Commit phase.
>>>>
>>>> // High memory usage starts from the commit phase.
>>>>
>>>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>>>
>>>> List<Future<?>> futures = new ArrayList<>();
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     final int i = m;
>>>>
>>>>     futures.add(executors.submit(() -> {
>>>>
>>>>         appendFilesList.get(i).commit();
>>>>
>>>>     }));
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> for (int m = 0; m < N; m++) {
>>>>
>>>>     futures.get(m).get();
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> executors.shutdownNow();
>>>>
>>>>
>>>>
>>>> // snippet of writeRecords().
>>>>
>>>> private static List<DataFile> writeRecords(Table table,
>>>> List<GenericRecord> records)
>>>>
>>>>         throws IOException {
>>>>
>>>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>>>
>>>>     try (var writer = new PartitionedWriterImpl(table)) {
>>>>
>>>>         for (var record : records) {
>>>>
>>>>             writer.write(record);
>>>>
>>>>         }
>>>>
>>>>         return
>>>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> Following is the heap usage for one of the experiments where we can see
>>>> very high heap usage. The initial low usage part is data writes. The high
>>>> heap usage starts with the commit phase.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>> Tabular
>>>>
>>>>

-- 
Ryan Blue
Tabular

Re: High memory usage with highly concurrent committers

Posted by Igor Dvorzhak <id...@google.com.INVALID>.
Yes, you still can upload files of any size, this property just configures
the amount of data that first cached in-memory and subsequently sent to GCS
in one HTTP request during resumable upload
<https://cloud.google.com/storage/docs/resumable-uploads> session.

On Mon, Dec 6, 2021 at 4:50 AM Piotr Findeisen <pi...@starburstdata.com>
wrote:

> Hi
>
> Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I
> can upload?
> Can i upload e.g. 1GB Parquet file, while also setting fs.gs.outputstream.
> upload.chunk.size=8388608 (8MB / MiB)?
>
> Best
> PF
>
>
> On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <id...@google.com.invalid>
> wrote:
>
>> No, right now this is a global property for the Hadoop FS instance. You
>> either need to use different clients/Hadoop FS instances to write different
>> files or switch to the direct upload mode (
>> fs.gs.outputstream.direct.upload.enable=true), which could be better for
>> your use case (in this write mode nothing cached in the memory and streamed
>> to GCS directly, but it does not allow failed upload retries), depending on
>> the parquet file sizes that you write.
>>
>> Also, you may want to test how critical 64MiB buffer size is for your
>> application, it may be the case that 16MiB, for example, will get you
>> desired performance for parquet file writes and good enough memory
>> consumption.
>>
>> But on a broader note this seems to be one of the reasons why it could be
>> good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it can
>> have separate write configuration optimized for metadata and data files.
>>
>> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
>> Mayur.Srivastava@twosigma.com> wrote:
>>
>>> Thanks Igor. This may help mitigate the problem.
>>>
>>>
>>>
>>> But it looks like it applies to all files. We still want data (parquet)
>>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
>>> is better. Is there a way to set the property based on file suffix or file
>>> type?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Mayur
>>>
>>>
>>>
>>> *From:* Igor Dvorzhak <id...@google.com.INVALID>
>>> *Sent:* Thursday, December 2, 2021 8:09 PM
>>> *To:* dev@iceberg.apache.org
>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>
>>>
>>>
>>> For each written object GCS connector allocates ~64MiB of memory by
>>> default to improve performance of large object writes. If you want to
>>> reduce memory utilization in cases when you write many files at once you
>>> just need to reduce upload chunk size to 8MiB, for example:
>>> fs.gs.outputstream.upload.chunk.size=8388608
>>>
>>>
>>>
>>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
>>> Mayur.Srivastava@twosigma.com> wrote:
>>>
>>> That is correct Daniel.
>>>
>>>
>>>
>>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
>>> gs:// prefix …” thread.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Mayur
>>>
>>>
>>>
>>> *From:* Daniel Weeks <da...@gmail.com>
>>> *Sent:* Wednesday, December 1, 2021 11:46 AM
>>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>
>>>
>>>
>>> I feel like what Mayur was saying is that S3FileIO actually works with
>>> GCS (it appears there is some S3 compatible API for GCS).
>>>
>>>
>>>
>>> If that is the case, then S3FileIO can be used natively against GCS,
>>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI
>>> schemes).
>>>
>>>
>>>
>>> This is new to me and I haven't tested this, but Mayur, if this does
>>> work, please share how you configured S3FileIO.
>>>
>>>
>>>
>>> -Dan
>>>
>>>
>>>
>>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:
>>>
>>> We are in the process of supporting multiple file system schemes using
>>> ResolvingFileIO, Ryan just added the initial implementation:
>>> https://github.com/apache/iceberg/pull/3593
>>>
>>>
>>>
>>> -Jack
>>>
>>>
>>>
>>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
>>> Mayur.Srivastava@twosigma.com> wrote:
>>>
>>> Thanks Ryan.
>>>
>>>
>>>
>>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>>> the following top two objects:
>>>
>>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total
>>> of 3.2G in one of my tests). I checked some of the reference and find that
>>> they are from
>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
>>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>>> coming from WriterBasedJsonGenerator, finalizer
>>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>>> but is it possible that Hadoop output streams are not closed and close is
>>> called the finalizers?
>>>
>>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>>> references.
>>>
>>>
>>>
>>> One interesting finding is that if I switch to the S3FileIO, the high
>>> memory usage goes away and the memory usage is similar to the serialized
>>> commits using a lock which is ~750 M for 128 parallel committers. And the
>>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>>
>>>
>>>
>>> So, the high memory problem manifests only when using the default
>>> HadoopFileSystem.
>>>
>>>
>>>
>>> Thanks, Mayur
>>>
>>>
>>>
>>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>>
>>>
>>>
>>> *From:* Ryan Blue <bl...@tabular.io>
>>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>
>>>
>>>
>>> Mayur,
>>>
>>>
>>>
>>> Is it possible to connect to this process with a profiler and look at
>>> what's taking up all of the space?
>>>
>>>
>>>
>>> I suspect that what's happening here is that you're loading the list of
>>> snapshots for each version of metadata, so you're holding a lot of copies
>>> of the entire snapshot history and possibly caching the list of manifests
>>> for some snapshots as well.
>>>
>>>
>>>
>>> I've thought about adding a way to avoid parsing and loading snapshots,
>>> probably by passing a cache when loading metadata so that all the copies of
>>> a table can share snapshots in memory. That would work fine because they're
>>> immutable. That might help you here, although a Snapshot instance will
>>> cache manifests after loading them if they are accessed, so you'd want to
>>> watch out for that as well.
>>>
>>>
>>>
>>> The best step forward is to get an idea of what objects are taking up
>>> that space with a profiler or heap dump if you can.
>>>
>>>
>>>
>>> Ryan
>>>
>>>
>>>
>>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>>> Mayur.Srivastava@twosigma.com> wrote:
>>>
>>> Hi Iceberg Community,
>>>
>>>
>>>
>>> I’m running some experiments with high commit contention (on the same
>>> Iceberg table writing to different partitions) and I'm observing very high
>>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>>
>>>
>>>
>>> *The scenario is described below:*
>>>
>>>
>>>
>>> *Note1: The catalog used is similar to the JDBC catalog.*
>>>
>>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
>>> S3.*
>>>
>>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>>> branch.*
>>>
>>>
>>>
>>> *Experiment params:*
>>>
>>> a. NT = 64 = number of parallel committers. Achieved using multiple
>>> threads within the same process.
>>>
>>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>>
>>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>>
>>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>>> enough retries are done so that all committers finish successfully).
>>>
>>>
>>>
>>> *Steps:*
>>>
>>>
>>>
>>> * *Create an Iceberg table *with three columns: time (timestamp without
>>> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>>>
>>> * Sequential step: create *NT different AppendFile* objects.
>>>
>>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
>>> and append the DataFile to the corresponding AppendFile. Basically, we
>>> create one parquet file per month (i.e. per partition) containing a single
>>> row. This is done to keep data size small for the experiment. Also, we
>>> ensure that each commit will contain a different partition.
>>>
>>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e.
>>> Run the commits in parallel.
>>>
>>> * Wait for all Futures to finish.
>>>
>>> I ran this experiment with various values for params. For example, I
>>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>>
>>>
>>>
>>> *Observations:*
>>>
>>> A. Total elapsed commit time increases with the number of committers
>>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>>> elapsed commit time is more than 250 s. This is acceptable given the nature
>>> of OCC in high concurrency.
>>>
>>> B. The number of table metadata files is a multiple of the number of
>>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>>> metadata json files was 380. This is acceptable given the nature of OCC in
>>> high concurrency.
>>>
>>> *C. The memory usage which keeps shooting-up  periodically to 7G in some
>>> experiments. This is noticeable (i.e. memory usage > 1G) when number of
>>> concurrent committers >= 16 and becomes worse when number of committers
>>> increase. I’ve not investigated further but it could be that the in-memory
>>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>>> away. But, I wanted to check here before trying out any alternative.*
>>>
>>>
>>>
>>> *Why is the concurrent commit important to us?*
>>>
>>> We have several users who use various processing engines to schedule
>>> their writes (into non-overlapping partitions) through a data service that
>>> takes care of writing and committing the data. In many cases, they end up
>>> in the high commit contention scenario as described above. My main worry
>>> here is that this is happening for a single table, if we have multiple
>>> tables being committed, the memory usage will be much larger.
>>>
>>>
>>>
>>> *Questions:  *
>>>
>>> 1.      Have others observed this behavior? Is the high memory usage
>>> expected or am I doing something wrong? Is there any way to reduce the
>>> memory footprint (e.g. by changing some metadata config) during the commit?
>>>
>>> 2.      What is the general recommendation for high concurrent
>>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Mayur
>>>
>>> *Code snippets:*
>>>
>>>
>>>
>>> Schema schema = new Schema(
>>>
>>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>>
>>>     NestedField.of(2, false, "id", IntegerType.get()),
>>>
>>>     NestedField.of(3, false, "value", DoubleType.get())
>>>
>>> );
>>>
>>>
>>>
>>> catalog.createTable(
>>>
>>>     tableIdentifier,
>>>
>>>     schema,
>>>
>>>     PartitionSpec.builderFor(schema).month("time").build(),
>>>
>>>     ImmutableMap.of(
>>>
>>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>>
>>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
>>> String.valueOf(minWait),
>>>
>>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
>>> String.valueOf(maxWait),
>>>
>>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>>
>>>     ) // properties
>>>
>>> );
>>>
>>>
>>>
>>> // Write data phase.
>>>
>>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>>
>>> for (int m = 0; m < NT; m++) {
>>>
>>>     appendFilesList.add(table.newAppend());
>>>
>>> }
>>>
>>>
>>>
>>> for (int m = 0; m < NT; m++) {
>>>
>>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>>> ChronoUnit.MONTHS);
>>>
>>>     ImmutableList<GenericRecord> records =
>>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>>
>>>     writeRecords(table,
>>> records).forEach(appendFilesList.get(m)::appendFile);
>>>
>>> }
>>>
>>>
>>>
>>> // Commit phase.
>>>
>>> // High memory usage starts from the commit phase.
>>>
>>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>>
>>> List<Future<?>> futures = new ArrayList<>();
>>>
>>> for (int m = 0; m < NT; m++) {
>>>
>>>     final int i = m;
>>>
>>>     futures.add(executors.submit(() -> {
>>>
>>>         appendFilesList.get(i).commit();
>>>
>>>     }));
>>>
>>> }
>>>
>>>
>>>
>>> for (int m = 0; m < N; m++) {
>>>
>>>     futures.get(m).get();
>>>
>>> }
>>>
>>>
>>>
>>> executors.shutdownNow();
>>>
>>>
>>>
>>> // snippet of writeRecords().
>>>
>>> private static List<DataFile> writeRecords(Table table,
>>> List<GenericRecord> records)
>>>
>>>         throws IOException {
>>>
>>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>>
>>>     try (var writer = new PartitionedWriterImpl(table)) {
>>>
>>>         for (var record : records) {
>>>
>>>             writer.write(record);
>>>
>>>         }
>>>
>>>         return
>>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>> Following is the heap usage for one of the experiments where we can see
>>> very high heap usage. The initial low usage part is data writes. The high
>>> heap usage starts with the commit phase.
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Ryan Blue
>>>
>>> Tabular
>>>
>>>

Re: High memory usage with highly concurrent committers

Posted by Piotr Findeisen <pi...@starburstdata.com>.
Hi

Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I can
upload?
Can i upload e.g. 1GB Parquet file, while also setting fs.gs.outputstream.
upload.chunk.size=8388608 (8MB / MiB)?

Best
PF


On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <id...@google.com.invalid> wrote:

> No, right now this is a global property for the Hadoop FS instance. You
> either need to use different clients/Hadoop FS instances to write different
> files or switch to the direct upload mode (
> fs.gs.outputstream.direct.upload.enable=true), which could be better for
> your use case (in this write mode nothing cached in the memory and streamed
> to GCS directly, but it does not allow failed upload retries), depending on
> the parquet file sizes that you write.
>
> Also, you may want to test how critical 64MiB buffer size is for your
> application, it may be the case that 16MiB, for example, will get you
> desired performance for parquet file writes and good enough memory
> consumption.
>
> But on a broader note this seems to be one of the reasons why it could be
> good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it can
> have separate write configuration optimized for metadata and data files.
>
> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
>> Thanks Igor. This may help mitigate the problem.
>>
>>
>>
>> But it looks like it applies to all files. We still want data (parquet)
>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
>> is better. Is there a way to set the property based on file suffix or file
>> type?
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>>
>>
>> *From:* Igor Dvorzhak <id...@google.com.INVALID>
>> *Sent:* Thursday, December 2, 2021 8:09 PM
>> *To:* dev@iceberg.apache.org
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> For each written object GCS connector allocates ~64MiB of memory by
>> default to improve performance of large object writes. If you want to
>> reduce memory utilization in cases when you write many files at once you
>> just need to reduce upload chunk size to 8MiB, for example:
>> fs.gs.outputstream.upload.chunk.size=8388608
>>
>>
>>
>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
>> Mayur.Srivastava@twosigma.com> wrote:
>>
>> That is correct Daniel.
>>
>>
>>
>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
>> gs:// prefix …” thread.
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>>
>>
>> *From:* Daniel Weeks <da...@gmail.com>
>> *Sent:* Wednesday, December 1, 2021 11:46 AM
>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> I feel like what Mayur was saying is that S3FileIO actually works with
>> GCS (it appears there is some S3 compatible API for GCS).
>>
>>
>>
>> If that is the case, then S3FileIO can be used natively against GCS,
>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI
>> schemes).
>>
>>
>>
>> This is new to me and I haven't tested this, but Mayur, if this does
>> work, please share how you configured S3FileIO.
>>
>>
>>
>> -Dan
>>
>>
>>
>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:
>>
>> We are in the process of supporting multiple file system schemes using
>> ResolvingFileIO, Ryan just added the initial implementation:
>> https://github.com/apache/iceberg/pull/3593
>>
>>
>>
>> -Jack
>>
>>
>>
>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
>> Mayur.Srivastava@twosigma.com> wrote:
>>
>> Thanks Ryan.
>>
>>
>>
>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>> the following top two objects:
>>
>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
>> 3.2G in one of my tests). I checked some of the reference and find that
>> they are from
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>> coming from WriterBasedJsonGenerator, finalizer
>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>> but is it possible that Hadoop output streams are not closed and close is
>> called the finalizers?
>>
>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>> references.
>>
>>
>>
>> One interesting finding is that if I switch to the S3FileIO, the high
>> memory usage goes away and the memory usage is similar to the serialized
>> commits using a lock which is ~750 M for 128 parallel committers. And the
>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>
>>
>>
>> So, the high memory problem manifests only when using the default
>> HadoopFileSystem.
>>
>>
>>
>> Thanks, Mayur
>>
>>
>>
>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>
>>
>>
>> *From:* Ryan Blue <bl...@tabular.io>
>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> Mayur,
>>
>>
>>
>> Is it possible to connect to this process with a profiler and look at
>> what's taking up all of the space?
>>
>>
>>
>> I suspect that what's happening here is that you're loading the list of
>> snapshots for each version of metadata, so you're holding a lot of copies
>> of the entire snapshot history and possibly caching the list of manifests
>> for some snapshots as well.
>>
>>
>>
>> I've thought about adding a way to avoid parsing and loading snapshots,
>> probably by passing a cache when loading metadata so that all the copies of
>> a table can share snapshots in memory. That would work fine because they're
>> immutable. That might help you here, although a Snapshot instance will
>> cache manifests after loading them if they are accessed, so you'd want to
>> watch out for that as well.
>>
>>
>>
>> The best step forward is to get an idea of what objects are taking up
>> that space with a profiler or heap dump if you can.
>>
>>
>>
>> Ryan
>>
>>
>>
>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>> Mayur.Srivastava@twosigma.com> wrote:
>>
>> Hi Iceberg Community,
>>
>>
>>
>> I’m running some experiments with high commit contention (on the same
>> Iceberg table writing to different partitions) and I'm observing very high
>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>
>>
>>
>> *The scenario is described below:*
>>
>>
>>
>> *Note1: The catalog used is similar to the JDBC catalog.*
>>
>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
>> S3.*
>>
>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>> branch.*
>>
>>
>>
>> *Experiment params:*
>>
>> a. NT = 64 = number of parallel committers. Achieved using multiple
>> threads within the same process.
>>
>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>
>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>
>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>> enough retries are done so that all committers finish successfully).
>>
>>
>>
>> *Steps:*
>>
>>
>>
>> * *Create an Iceberg table *with three columns: time (timestamp without
>> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>>
>> * Sequential step: create *NT different AppendFile* objects.
>>
>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
>> and append the DataFile to the corresponding AppendFile. Basically, we
>> create one parquet file per month (i.e. per partition) containing a single
>> row. This is done to keep data size small for the experiment. Also, we
>> ensure that each commit will contain a different partition.
>>
>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
>> the commits in parallel.
>>
>> * Wait for all Futures to finish.
>>
>> I ran this experiment with various values for params. For example, I
>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>
>>
>>
>> *Observations:*
>>
>> A. Total elapsed commit time increases with the number of committers
>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>> elapsed commit time is more than 250 s. This is acceptable given the nature
>> of OCC in high concurrency.
>>
>> B. The number of table metadata files is a multiple of the number of
>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>> metadata json files was 380. This is acceptable given the nature of OCC in
>> high concurrency.
>>
>> *C. The memory usage which keeps shooting-up  periodically to 7G in some
>> experiments. This is noticeable (i.e. memory usage > 1G) when number of
>> concurrent committers >= 16 and becomes worse when number of committers
>> increase. I’ve not investigated further but it could be that the in-memory
>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>> away. But, I wanted to check here before trying out any alternative.*
>>
>>
>>
>> *Why is the concurrent commit important to us?*
>>
>> We have several users who use various processing engines to schedule
>> their writes (into non-overlapping partitions) through a data service that
>> takes care of writing and committing the data. In many cases, they end up
>> in the high commit contention scenario as described above. My main worry
>> here is that this is happening for a single table, if we have multiple
>> tables being committed, the memory usage will be much larger.
>>
>>
>>
>> *Questions:  *
>>
>> 1.      Have others observed this behavior? Is the high memory usage
>> expected or am I doing something wrong? Is there any way to reduce the
>> memory footprint (e.g. by changing some metadata config) during the commit?
>>
>> 2.      What is the general recommendation for high concurrent
>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>> *Code snippets:*
>>
>>
>>
>> Schema schema = new Schema(
>>
>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>
>>     NestedField.of(2, false, "id", IntegerType.get()),
>>
>>     NestedField.of(3, false, "value", DoubleType.get())
>>
>> );
>>
>>
>>
>> catalog.createTable(
>>
>>     tableIdentifier,
>>
>>     schema,
>>
>>     PartitionSpec.builderFor(schema).month("time").build(),
>>
>>     ImmutableMap.of(
>>
>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>
>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>>
>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>>
>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>
>>     ) // properties
>>
>> );
>>
>>
>>
>> // Write data phase.
>>
>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     appendFilesList.add(table.newAppend());
>>
>> }
>>
>>
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>> ChronoUnit.MONTHS);
>>
>>     ImmutableList<GenericRecord> records =
>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>
>>     writeRecords(table,
>> records).forEach(appendFilesList.get(m)::appendFile);
>>
>> }
>>
>>
>>
>> // Commit phase.
>>
>> // High memory usage starts from the commit phase.
>>
>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>
>> List<Future<?>> futures = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     final int i = m;
>>
>>     futures.add(executors.submit(() -> {
>>
>>         appendFilesList.get(i).commit();
>>
>>     }));
>>
>> }
>>
>>
>>
>> for (int m = 0; m < N; m++) {
>>
>>     futures.get(m).get();
>>
>> }
>>
>>
>>
>> executors.shutdownNow();
>>
>>
>>
>> // snippet of writeRecords().
>>
>> private static List<DataFile> writeRecords(Table table,
>> List<GenericRecord> records)
>>
>>         throws IOException {
>>
>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>
>>     try (var writer = new PartitionedWriterImpl(table)) {
>>
>>         for (var record : records) {
>>
>>             writer.write(record);
>>
>>         }
>>
>>         return
>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>
>>     }
>>
>> }
>>
>>
>>
>> Following is the heap usage for one of the experiments where we can see
>> very high heap usage. The initial low usage part is data writes. The high
>> heap usage starts with the commit phase.
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Tabular
>>
>>

Re: High memory usage with highly concurrent committers

Posted by Igor Dvorzhak <id...@google.com.INVALID>.
No, right now this is a global property for the Hadoop FS instance. You
either need to use different clients/Hadoop FS instances to write different
files or switch to the direct upload mode (
fs.gs.outputstream.direct.upload.enable=true), which could be better for
your use case (in this write mode nothing cached in the memory and streamed
to GCS directly, but it does not allow failed upload retries), depending on
the parquet file sizes that you write.

Also, you may want to test how critical 64MiB buffer size is for your
application, it may be the case that 16MiB, for example, will get you
desired performance for parquet file writes and good enough memory
consumption.

But on a broader note this seems to be one of the reasons why it could be
good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it can
have separate write configuration optimized for metadata and data files.

On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> Thanks Igor. This may help mitigate the problem.
>
>
>
> But it looks like it applies to all files. We still want data (parquet)
> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
> is better. Is there a way to set the property based on file suffix or file
> type?
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Igor Dvorzhak <id...@google.com.INVALID>
> *Sent:* Thursday, December 2, 2021 8:09 PM
> *To:* dev@iceberg.apache.org
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> For each written object GCS connector allocates ~64MiB of memory by
> default to improve performance of large object writes. If you want to
> reduce memory utilization in cases when you write many files at once you
> just need to reduce upload chunk size to 8MiB, for example:
> fs.gs.outputstream.upload.chunk.size=8388608
>
>
>
> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> That is correct Daniel.
>
>
>
> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
> gs:// prefix …” thread.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Daniel Weeks <da...@gmail.com>
> *Sent:* Wednesday, December 1, 2021 11:46 AM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> I feel like what Mayur was saying is that S3FileIO actually works with GCS
> (it appears there is some S3 compatible API for GCS).
>
>
>
> If that is the case, then S3FileIO can be used natively against GCS, which
> wouldn't require the ResolvingRileIO (just supporting the GCS URI schemes).
>
>
>
> This is new to me and I haven't tested this, but Mayur, if this does work,
> please share how you configured S3FileIO.
>
>
>
> -Dan
>
>
>
> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:
>
> We are in the process of supporting multiple file system schemes using
> ResolvingFileIO, Ryan just added the initial implementation:
> https://github.com/apache/iceberg/pull/3593
>
>
>
> -Jack
>
>
>
> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Thanks Ryan.
>
>
>
> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the
> following top two objects:
>
> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
> 3.2G in one of my tests). I checked some of the reference and find that
> they are from
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
> coming from WriterBasedJsonGenerator, finalizer
> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
> but is it possible that Hadoop output streams are not closed and close is
> called the finalizers?
>
> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
> references.
>
>
>
> One interesting finding is that if I switch to the S3FileIO, the high
> memory usage goes away and the memory usage is similar to the serialized
> commits using a lock which is ~750 M for 128 parallel committers. And the
> 750 M usage may fall-in line with the snapshots and manifest* objects.
>
>
>
> So, the high memory problem manifests only when using the default
> HadoopFileSystem.
>
>
>
> Thanks, Mayur
>
>
>
> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>
>
>
> *From:* Ryan Blue <bl...@tabular.io>
> *Sent:* Tuesday, November 30, 2021 3:53 PM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> Mayur,
>
>
>
> Is it possible to connect to this process with a profiler and look at
> what's taking up all of the space?
>
>
>
> I suspect that what's happening here is that you're loading the list of
> snapshots for each version of metadata, so you're holding a lot of copies
> of the entire snapshot history and possibly caching the list of manifests
> for some snapshots as well.
>
>
>
> I've thought about adding a way to avoid parsing and loading snapshots,
> probably by passing a cache when loading metadata so that all the copies of
> a table can share snapshots in memory. That would work fine because they're
> immutable. That might help you here, although a Snapshot instance will
> cache manifests after loading them if they are accessed, so you'd want to
> watch out for that as well.
>
>
>
> The best step forward is to get an idea of what objects are taking up that
> space with a profiler or heap dump if you can.
>
>
>
> Ryan
>
>
>
> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>
>
>
> --
>
> Ryan Blue
>
> Tabular
>
>

RE: High memory usage with highly concurrent committers

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Thanks Igor. This may help mitigate the problem.

But it looks like it applies to all files. We still want data (parquet) files to allocate 64 MiB (seems reasonable). For metadata, a smaller size is better. Is there a way to set the property based on file suffix or file type?

Thanks,
Mayur

From: Igor Dvorzhak <id...@google.com.INVALID>
Sent: Thursday, December 2, 2021 8:09 PM
To: dev@iceberg.apache.org
Subject: Re: High memory usage with highly concurrent committers

For each written object GCS connector allocates ~64MiB of memory by default to improve performance of large object writes. If you want to reduce memory utilization in cases when you write many files at once you just need to reduce upload chunk size to 8MiB, for example: fs.gs.outputstream.upload.chunk.size=8388608

On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
That is correct Daniel.

I’ve tried to explain our use of S3FileIO with GCS in the “Supporting gs:// prefix …” thread.

Thanks,
Mayur

From: Daniel Weeks <da...@gmail.com>>
Sent: Wednesday, December 1, 2021 11:46 AM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: High memory usage with highly concurrent committers

I feel like what Mayur was saying is that S3FileIO actually works with GCS (it appears there is some S3 compatible API for GCS).

If that is the case, then S3FileIO can be used natively against GCS, which wouldn't require the ResolvingRileIO (just supporting the GCS URI schemes).

This is new to me and I haven't tested this, but Mayur, if this does work, please share how you configured S3FileIO.

-Dan

On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com>> wrote:
We are in the process of supporting multiple file system schemes using ResolvingFileIO, Ryan just added the initial implementation: https://github.com/apache/iceberg/pull/3593

-Jack

On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Thanks Ryan.

I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the following top two objects:

1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of 3.2G in one of my tests). I checked some of the reference and find that they are from com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references coming from WriterBasedJsonGenerator, finalizer (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, but is it possible that Hadoop output streams are not closed and close is called the finalizers?

2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the references.

One interesting finding is that if I switch to the S3FileIO, the high memory usage goes away and the memory usage is similar to the serialized commits using a lock which is ~750 M for 128 parallel committers. And the 750 M usage may fall-in line with the snapshots and manifest* objects.

So, the high memory problem manifests only when using the default HadoopFileSystem.

Thanks, Mayur

PS: I had to change S3FileIO locally to accept gs:// prefix so that it works with GCS. Is there a plan to support gs:// prefix in the S3URI?

From: Ryan Blue <bl...@tabular.io>>
Sent: Tuesday, November 30, 2021 3:53 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: High memory usage with highly concurrent committers

Mayur,

Is it possible to connect to this process with a profiler and look at what's taking up all of the space?

I suspect that what's happening here is that you're loading the list of snapshots for each version of metadata, so you're holding a lot of copies of the entire snapshot history and possibly caching the list of manifests for some snapshots as well.

I've thought about adding a way to avoid parsing and loading snapshots, probably by passing a cache when loading metadata so that all the copies of a table can share snapshots in memory. That would work fine because they're immutable. That might help you here, although a Snapshot instance will cache manifests after loading them if they are accessed, so you'd want to watch out for that as well.

The best step forward is to get an idea of what objects are taking up that space with a profiler or heap dump if you can.

Ryan

On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi Iceberg Community,

I’m running some experiments with high commit contention (on the same Iceberg table writing to different partitions) and I'm observing very high memory usage (5G to 7G). (Note that the data being written is very small.)

The scenario is described below:

Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven’t tried the latest main branch.

Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough retries are done so that all committers finish successfully).

Steps:

* Create an Iceberg table with three columns: time (timestamp without timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and append the DataFile to the corresponding AppendFile. Basically, we create one parquet file per month (i.e. per partition) containing a single row. This is done to keep data size small for the experiment. Also, we ensure that each commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable which calls AppendFile.commit(), and get the Future. I.e. Run the commits in parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 64, 128). Code snippets can be found below.

Observations:
A. Total elapsed commit time increases with the number of committers which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed commit time is more than 250 s. This is acceptable given the nature of OCC in high concurrency.
B. The number of table metadata files is a multiple of the number of committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table metadata json files was 380. This is acceptable given the nature of OCC in high concurrency.
C. The memory usage which keeps shooting-up  periodically to 7G in some experiments. This is noticeable (i.e. memory usage > 1G) when number of concurrent committers >= 16 and becomes worse when number of committers increase. I’ve not investigated further but it could be that the in-memory metadata (snapshots, etc.) is growing very large. If I serialize the commit attempts (e.g. by acquiring a lock), the high memory usage problem goes away. But, I wanted to check here before trying out any alternative.

Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their writes (into non-overlapping partitions) through a data service that takes care of writing and committing the data. In many cases, they end up in the high commit contention scenario as described above. My main worry here is that this is happening for a single table, if we have multiple tables being committed, the memory usage will be much larger.

Questions:

1.      Have others observed this behavior? Is the high memory usage expected or am I doing something wrong? Is there any way to reduce the memory footprint (e.g. by changing some metadata config) during the commit?

2.      What is the general recommendation for high concurrent committers? Is high concurrent committers an anti-pattern for Iceberg?

Thanks,
Mayur
Code snippets:

Schema schema = new Schema(
    NestedField.of(1, false, "time", TimestampType.withoutZone()),
    NestedField.of(2, false, "id", IntegerType.get()),
    NestedField.of(3, false, "value", DoubleType.get())
);

catalog.createTable(
    tableIdentifier,
    schema,
    PartitionSpec.builderFor(schema).month("time").build(),
    ImmutableMap.of(
        TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
        TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
        TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
        "write.metadata.previous-versions-max", String.valueOf(1)
    ) // properties
);

// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    appendFilesList.add(table.newAppend());
}

for (int m = 0; m < NT; m++) {
    LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, ChronoUnit.MONTHS);
    ImmutableList<GenericRecord> records = ImmutableList.of(createRecord(schema, time, 1, 10.0));
    writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}

// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    final int i = m;
    futures.add(executors.submit(() -> {
        appendFilesList.get(i).commit();
    }));
}

for (int m = 0; m < N; m++) {
    futures.get(m).get();
}

executors.shutdownNow();

// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord> records)
        throws IOException {
    // PartitionedWriterImpl extends extends PartitionedWriter<Record>
    try (var writer = new PartitionedWriterImpl(table)) {
        for (var record : records) {
            writer.write(record);
        }
        return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
    }
}

Following is the heap usage for one of the experiments where we can see very high heap usage. The initial low usage part is data writes. The high heap usage starts with the commit phase.


--
Ryan Blue
Tabular

Re: High memory usage with highly concurrent committers

Posted by Igor Dvorzhak <id...@google.com.INVALID>.
For each written object GCS connector allocates ~64MiB of memory by default
to improve performance of large object writes. If you want to reduce memory
utilization in cases when you write many files at once you just need to
reduce upload chunk size to 8MiB, for example:
fs.gs.outputstream.upload.chunk.size=8388608

On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> That is correct Daniel.
>
>
>
> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
> gs:// prefix …” thread.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Daniel Weeks <da...@gmail.com>
> *Sent:* Wednesday, December 1, 2021 11:46 AM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> I feel like what Mayur was saying is that S3FileIO actually works with GCS
> (it appears there is some S3 compatible API for GCS).
>
>
>
> If that is the case, then S3FileIO can be used natively against GCS, which
> wouldn't require the ResolvingRileIO (just supporting the GCS URI schemes).
>
>
>
> This is new to me and I haven't tested this, but Mayur, if this does work,
> please share how you configured S3FileIO.
>
>
>
> -Dan
>
>
>
> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:
>
> We are in the process of supporting multiple file system schemes using
> ResolvingFileIO, Ryan just added the initial implementation:
> https://github.com/apache/iceberg/pull/3593
>
>
>
> -Jack
>
>
>
> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Thanks Ryan.
>
>
>
> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the
> following top two objects:
>
> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
> 3.2G in one of my tests). I checked some of the reference and find that
> they are from
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
> coming from WriterBasedJsonGenerator, finalizer
> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
> but is it possible that Hadoop output streams are not closed and close is
> called the finalizers?
>
> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
> references.
>
>
>
> One interesting finding is that if I switch to the S3FileIO, the high
> memory usage goes away and the memory usage is similar to the serialized
> commits using a lock which is ~750 M for 128 parallel committers. And the
> 750 M usage may fall-in line with the snapshots and manifest* objects.
>
>
>
> So, the high memory problem manifests only when using the default
> HadoopFileSystem.
>
>
>
> Thanks, Mayur
>
>
>
> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>
>
>
> *From:* Ryan Blue <bl...@tabular.io>
> *Sent:* Tuesday, November 30, 2021 3:53 PM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> Mayur,
>
>
>
> Is it possible to connect to this process with a profiler and look at
> what's taking up all of the space?
>
>
>
> I suspect that what's happening here is that you're loading the list of
> snapshots for each version of metadata, so you're holding a lot of copies
> of the entire snapshot history and possibly caching the list of manifests
> for some snapshots as well.
>
>
>
> I've thought about adding a way to avoid parsing and loading snapshots,
> probably by passing a cache when loading metadata so that all the copies of
> a table can share snapshots in memory. That would work fine because they're
> immutable. That might help you here, although a Snapshot instance will
> cache manifests after loading them if they are accessed, so you'd want to
> watch out for that as well.
>
>
>
> The best step forward is to get an idea of what objects are taking up that
> space with a profiler or heap dump if you can.
>
>
>
> Ryan
>
>
>
> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>
>
>
> --
>
> Ryan Blue
>
> Tabular
>
>

RE: High memory usage with highly concurrent committers

Posted by Mayur Srivastava <Ma...@twosigma.com>.
That is correct Daniel.

I’ve tried to explain our use of S3FileIO with GCS in the “Supporting gs:// prefix …” thread.

Thanks,
Mayur

From: Daniel Weeks <da...@gmail.com>
Sent: Wednesday, December 1, 2021 11:46 AM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: High memory usage with highly concurrent committers

I feel like what Mayur was saying is that S3FileIO actually works with GCS (it appears there is some S3 compatible API for GCS).

If that is the case, then S3FileIO can be used natively against GCS, which wouldn't require the ResolvingRileIO (just supporting the GCS URI schemes).

This is new to me and I haven't tested this, but Mayur, if this does work, please share how you configured S3FileIO.

-Dan

On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com>> wrote:
We are in the process of supporting multiple file system schemes using ResolvingFileIO, Ryan just added the initial implementation: https://github.com/apache/iceberg/pull/3593

-Jack

On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Thanks Ryan.

I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the following top two objects:

1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of 3.2G in one of my tests). I checked some of the reference and find that they are from com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references coming from WriterBasedJsonGenerator, finalizer (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, but is it possible that Hadoop output streams are not closed and close is called the finalizers?

2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the references.

One interesting finding is that if I switch to the S3FileIO, the high memory usage goes away and the memory usage is similar to the serialized commits using a lock which is ~750 M for 128 parallel committers. And the 750 M usage may fall-in line with the snapshots and manifest* objects.

So, the high memory problem manifests only when using the default HadoopFileSystem.

Thanks, Mayur

PS: I had to change S3FileIO locally to accept gs:// prefix so that it works with GCS. Is there a plan to support gs:// prefix in the S3URI?

From: Ryan Blue <bl...@tabular.io>>
Sent: Tuesday, November 30, 2021 3:53 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: High memory usage with highly concurrent committers

Mayur,

Is it possible to connect to this process with a profiler and look at what's taking up all of the space?

I suspect that what's happening here is that you're loading the list of snapshots for each version of metadata, so you're holding a lot of copies of the entire snapshot history and possibly caching the list of manifests for some snapshots as well.

I've thought about adding a way to avoid parsing and loading snapshots, probably by passing a cache when loading metadata so that all the copies of a table can share snapshots in memory. That would work fine because they're immutable. That might help you here, although a Snapshot instance will cache manifests after loading them if they are accessed, so you'd want to watch out for that as well.

The best step forward is to get an idea of what objects are taking up that space with a profiler or heap dump if you can.

Ryan

On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi Iceberg Community,

I’m running some experiments with high commit contention (on the same Iceberg table writing to different partitions) and I'm observing very high memory usage (5G to 7G). (Note that the data being written is very small.)

The scenario is described below:

Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven’t tried the latest main branch.

Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough retries are done so that all committers finish successfully).

Steps:

* Create an Iceberg table with three columns: time (timestamp without timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and append the DataFile to the corresponding AppendFile. Basically, we create one parquet file per month (i.e. per partition) containing a single row. This is done to keep data size small for the experiment. Also, we ensure that each commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable which calls AppendFile.commit(), and get the Future. I.e. Run the commits in parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 64, 128). Code snippets can be found below.

Observations:
A. Total elapsed commit time increases with the number of committers which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed commit time is more than 250 s. This is acceptable given the nature of OCC in high concurrency.
B. The number of table metadata files is a multiple of the number of committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table metadata json files was 380. This is acceptable given the nature of OCC in high concurrency.
C. The memory usage which keeps shooting-up  periodically to 7G in some experiments. This is noticeable (i.e. memory usage > 1G) when number of concurrent committers >= 16 and becomes worse when number of committers increase. I’ve not investigated further but it could be that the in-memory metadata (snapshots, etc.) is growing very large. If I serialize the commit attempts (e.g. by acquiring a lock), the high memory usage problem goes away. But, I wanted to check here before trying out any alternative.

Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their writes (into non-overlapping partitions) through a data service that takes care of writing and committing the data. In many cases, they end up in the high commit contention scenario as described above. My main worry here is that this is happening for a single table, if we have multiple tables being committed, the memory usage will be much larger.

Questions:

1.      Have others observed this behavior? Is the high memory usage expected or am I doing something wrong? Is there any way to reduce the memory footprint (e.g. by changing some metadata config) during the commit?

2.      What is the general recommendation for high concurrent committers? Is high concurrent committers an anti-pattern for Iceberg?

Thanks,
Mayur
Code snippets:

Schema schema = new Schema(
    NestedField.of(1, false, "time", TimestampType.withoutZone()),
    NestedField.of(2, false, "id", IntegerType.get()),
    NestedField.of(3, false, "value", DoubleType.get())
);

catalog.createTable(
    tableIdentifier,
    schema,
    PartitionSpec.builderFor(schema).month("time").build(),
    ImmutableMap.of(
        TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
        TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
        TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
        "write.metadata.previous-versions-max", String.valueOf(1)
    ) // properties
);

// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    appendFilesList.add(table.newAppend());
}

for (int m = 0; m < NT; m++) {
    LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, ChronoUnit.MONTHS);
    ImmutableList<GenericRecord> records = ImmutableList.of(createRecord(schema, time, 1, 10.0));
    writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}

// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    final int i = m;
    futures.add(executors.submit(() -> {
        appendFilesList.get(i).commit();
    }));
}

for (int m = 0; m < N; m++) {
    futures.get(m).get();
}

executors.shutdownNow();

// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord> records)
        throws IOException {
    // PartitionedWriterImpl extends extends PartitionedWriter<Record>
    try (var writer = new PartitionedWriterImpl(table)) {
        for (var record : records) {
            writer.write(record);
        }
        return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
    }
}

Following is the heap usage for one of the experiments where we can see very high heap usage. The initial low usage part is data writes. The high heap usage starts with the commit phase.


--
Ryan Blue
Tabular

Re: High memory usage with highly concurrent committers

Posted by Daniel Weeks <da...@gmail.com>.
I feel like what Mayur was saying is that S3FileIO actually works with GCS
(it appears there is some S3 compatible API for GCS).

If that is the case, then S3FileIO can be used natively against GCS, which
wouldn't require the ResolvingRileIO (just supporting the GCS URI schemes).

This is new to me and I haven't tested this, but Mayur, if this does work,
please share how you configured S3FileIO.

-Dan

On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <ye...@gmail.com> wrote:

> We are in the process of supporting multiple file system schemes using
> ResolvingFileIO, Ryan just added the initial implementation:
> https://github.com/apache/iceberg/pull/3593
>
> -Jack
>
> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
>> Thanks Ryan.
>>
>>
>>
>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>> the following top two objects:
>>
>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
>> 3.2G in one of my tests). I checked some of the reference and find that
>> they are from
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>> coming from WriterBasedJsonGenerator, finalizer
>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>> but is it possible that Hadoop output streams are not closed and close is
>> called the finalizers?
>>
>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>> references.
>>
>>
>>
>> One interesting finding is that if I switch to the S3FileIO, the high
>> memory usage goes away and the memory usage is similar to the serialized
>> commits using a lock which is ~750 M for 128 parallel committers. And the
>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>
>>
>>
>> So, the high memory problem manifests only when using the default
>> HadoopFileSystem.
>>
>>
>>
>> Thanks, Mayur
>>
>>
>>
>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>
>>
>>
>> *From:* Ryan Blue <bl...@tabular.io>
>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>> *To:* Iceberg Dev List <de...@iceberg.apache.org>
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> Mayur,
>>
>>
>>
>> Is it possible to connect to this process with a profiler and look at
>> what's taking up all of the space?
>>
>>
>>
>> I suspect that what's happening here is that you're loading the list of
>> snapshots for each version of metadata, so you're holding a lot of copies
>> of the entire snapshot history and possibly caching the list of manifests
>> for some snapshots as well.
>>
>>
>>
>> I've thought about adding a way to avoid parsing and loading snapshots,
>> probably by passing a cache when loading metadata so that all the copies of
>> a table can share snapshots in memory. That would work fine because they're
>> immutable. That might help you here, although a Snapshot instance will
>> cache manifests after loading them if they are accessed, so you'd want to
>> watch out for that as well.
>>
>>
>>
>> The best step forward is to get an idea of what objects are taking up
>> that space with a profiler or heap dump if you can.
>>
>>
>>
>> Ryan
>>
>>
>>
>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>> Mayur.Srivastava@twosigma.com> wrote:
>>
>> Hi Iceberg Community,
>>
>>
>>
>> I’m running some experiments with high commit contention (on the same
>> Iceberg table writing to different partitions) and I'm observing very high
>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>
>>
>>
>> *The scenario is described below:*
>>
>>
>>
>> *Note1: The catalog used is similar to the JDBC catalog.*
>>
>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
>> S3.*
>>
>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>> branch.*
>>
>>
>>
>> *Experiment params:*
>>
>> a. NT = 64 = number of parallel committers. Achieved using multiple
>> threads within the same process.
>>
>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>
>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>
>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>> enough retries are done so that all committers finish successfully).
>>
>>
>>
>> *Steps:*
>>
>>
>>
>> * *Create an Iceberg table *with three columns: time (timestamp without
>> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>>
>> * Sequential step: create *NT different AppendFile* objects.
>>
>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
>> and append the DataFile to the corresponding AppendFile. Basically, we
>> create one parquet file per month (i.e. per partition) containing a single
>> row. This is done to keep data size small for the experiment. Also, we
>> ensure that each commit will contain a different partition.
>>
>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
>> the commits in parallel.
>>
>> * Wait for all Futures to finish.
>>
>> I ran this experiment with various values for params. For example, I
>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>
>>
>>
>> *Observations:*
>>
>> A. Total elapsed commit time increases with the number of committers
>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>> elapsed commit time is more than 250 s. This is acceptable given the nature
>> of OCC in high concurrency.
>>
>> B. The number of table metadata files is a multiple of the number of
>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>> metadata json files was 380. This is acceptable given the nature of OCC in
>> high concurrency.
>>
>> *C. The memory usage which keeps shooting-up  periodically to 7G in some
>> experiments. This is noticeable (i.e. memory usage > 1G) when number of
>> concurrent committers >= 16 and becomes worse when number of committers
>> increase. I’ve not investigated further but it could be that the in-memory
>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>> away. But, I wanted to check here before trying out any alternative.*
>>
>>
>>
>> *Why is the concurrent commit important to us?*
>>
>> We have several users who use various processing engines to schedule
>> their writes (into non-overlapping partitions) through a data service that
>> takes care of writing and committing the data. In many cases, they end up
>> in the high commit contention scenario as described above. My main worry
>> here is that this is happening for a single table, if we have multiple
>> tables being committed, the memory usage will be much larger.
>>
>>
>>
>> *Questions:  *
>>
>> 1.      Have others observed this behavior? Is the high memory usage
>> expected or am I doing something wrong? Is there any way to reduce the
>> memory footprint (e.g. by changing some metadata config) during the commit?
>>
>> 2.      What is the general recommendation for high concurrent
>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>> *Code snippets:*
>>
>>
>>
>> Schema schema = new Schema(
>>
>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>
>>     NestedField.of(2, false, "id", IntegerType.get()),
>>
>>     NestedField.of(3, false, "value", DoubleType.get())
>>
>> );
>>
>>
>>
>> catalog.createTable(
>>
>>     tableIdentifier,
>>
>>     schema,
>>
>>     PartitionSpec.builderFor(schema).month("time").build(),
>>
>>     ImmutableMap.of(
>>
>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>
>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>>
>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>>
>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>
>>     ) // properties
>>
>> );
>>
>>
>>
>> // Write data phase.
>>
>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     appendFilesList.add(table.newAppend());
>>
>> }
>>
>>
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>> ChronoUnit.MONTHS);
>>
>>     ImmutableList<GenericRecord> records =
>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>
>>     writeRecords(table,
>> records).forEach(appendFilesList.get(m)::appendFile);
>>
>> }
>>
>>
>>
>> // Commit phase.
>>
>> // High memory usage starts from the commit phase.
>>
>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>
>> List<Future<?>> futures = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     final int i = m;
>>
>>     futures.add(executors.submit(() -> {
>>
>>         appendFilesList.get(i).commit();
>>
>>     }));
>>
>> }
>>
>>
>>
>> for (int m = 0; m < N; m++) {
>>
>>     futures.get(m).get();
>>
>> }
>>
>>
>>
>> executors.shutdownNow();
>>
>>
>>
>> // snippet of writeRecords().
>>
>> private static List<DataFile> writeRecords(Table table,
>> List<GenericRecord> records)
>>
>>         throws IOException {
>>
>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>
>>     try (var writer = new PartitionedWriterImpl(table)) {
>>
>>         for (var record : records) {
>>
>>             writer.write(record);
>>
>>         }
>>
>>         return
>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>
>>     }
>>
>> }
>>
>>
>>
>> Following is the heap usage for one of the experiments where we can see
>> very high heap usage. The initial low usage part is data writes. The high
>> heap usage starts with the commit phase.
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Tabular
>>
>

Re: High memory usage with highly concurrent committers

Posted by Jack Ye <ye...@gmail.com>.
We are in the process of supporting multiple file system schemes using
ResolvingFileIO, Ryan just added the initial implementation:
https://github.com/apache/iceberg/pull/3593

-Jack

On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> Thanks Ryan.
>
>
>
> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the
> following top two objects:
>
> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
> 3.2G in one of my tests). I checked some of the reference and find that
> they are from
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader
> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
> coming from WriterBasedJsonGenerator, finalizer
> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
> but is it possible that Hadoop output streams are not closed and close is
> called the finalizers?
>
> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
> references.
>
>
>
> One interesting finding is that if I switch to the S3FileIO, the high
> memory usage goes away and the memory usage is similar to the serialized
> commits using a lock which is ~750 M for 128 parallel committers. And the
> 750 M usage may fall-in line with the snapshots and manifest* objects.
>
>
>
> So, the high memory problem manifests only when using the default
> HadoopFileSystem.
>
>
>
> Thanks, Mayur
>
>
>
> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>
>
>
> *From:* Ryan Blue <bl...@tabular.io>
> *Sent:* Tuesday, November 30, 2021 3:53 PM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> Mayur,
>
>
>
> Is it possible to connect to this process with a profiler and look at
> what's taking up all of the space?
>
>
>
> I suspect that what's happening here is that you're loading the list of
> snapshots for each version of metadata, so you're holding a lot of copies
> of the entire snapshot history and possibly caching the list of manifests
> for some snapshots as well.
>
>
>
> I've thought about adding a way to avoid parsing and loading snapshots,
> probably by passing a cache when loading metadata so that all the copies of
> a table can share snapshots in memory. That would work fine because they're
> immutable. That might help you here, although a Snapshot instance will
> cache manifests after loading them if they are accessed, so you'd want to
> watch out for that as well.
>
>
>
> The best step forward is to get an idea of what objects are taking up that
> space with a profiler or heap dump if you can.
>
>
>
> Ryan
>
>
>
> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>
>
>
> --
>
> Ryan Blue
>
> Tabular
>

RE: High memory usage with highly concurrent committers

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Thanks Ryan.

I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the following top two objects:

1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of 3.2G in one of my tests). I checked some of the reference and find that they are from com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. MediaHttpUploader <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references coming from WriterBasedJsonGenerator, finalizer (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, but is it possible that Hadoop output streams are not closed and close is called the finalizers?

2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the references.

One interesting finding is that if I switch to the S3FileIO, the high memory usage goes away and the memory usage is similar to the serialized commits using a lock which is ~750 M for 128 parallel committers. And the 750 M usage may fall-in line with the snapshots and manifest* objects.

So, the high memory problem manifests only when using the default HadoopFileSystem.

Thanks, Mayur

PS: I had to change S3FileIO locally to accept gs:// prefix so that it works with GCS. Is there a plan to support gs:// prefix in the S3URI?

From: Ryan Blue <bl...@tabular.io>
Sent: Tuesday, November 30, 2021 3:53 PM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: High memory usage with highly concurrent committers

Mayur,

Is it possible to connect to this process with a profiler and look at what's taking up all of the space?

I suspect that what's happening here is that you're loading the list of snapshots for each version of metadata, so you're holding a lot of copies of the entire snapshot history and possibly caching the list of manifests for some snapshots as well.

I've thought about adding a way to avoid parsing and loading snapshots, probably by passing a cache when loading metadata so that all the copies of a table can share snapshots in memory. That would work fine because they're immutable. That might help you here, although a Snapshot instance will cache manifests after loading them if they are accessed, so you'd want to watch out for that as well.

The best step forward is to get an idea of what objects are taking up that space with a profiler or heap dump if you can.

Ryan

On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi Iceberg Community,

I’m running some experiments with high commit contention (on the same Iceberg table writing to different partitions) and I'm observing very high memory usage (5G to 7G). (Note that the data being written is very small.)

The scenario is described below:

Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven’t tried the latest main branch.

Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough retries are done so that all committers finish successfully).

Steps:

* Create an Iceberg table with three columns: time (timestamp without timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and append the DataFile to the corresponding AppendFile. Basically, we create one parquet file per month (i.e. per partition) containing a single row. This is done to keep data size small for the experiment. Also, we ensure that each commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable which calls AppendFile.commit(), and get the Future. I.e. Run the commits in parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 64, 128). Code snippets can be found below.

Observations:
A. Total elapsed commit time increases with the number of committers which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed commit time is more than 250 s. This is acceptable given the nature of OCC in high concurrency.
B. The number of table metadata files is a multiple of the number of committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table metadata json files was 380. This is acceptable given the nature of OCC in high concurrency.
C. The memory usage which keeps shooting-up  periodically to 7G in some experiments. This is noticeable (i.e. memory usage > 1G) when number of concurrent committers >= 16 and becomes worse when number of committers increase. I’ve not investigated further but it could be that the in-memory metadata (snapshots, etc.) is growing very large. If I serialize the commit attempts (e.g. by acquiring a lock), the high memory usage problem goes away. But, I wanted to check here before trying out any alternative.

Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their writes (into non-overlapping partitions) through a data service that takes care of writing and committing the data. In many cases, they end up in the high commit contention scenario as described above. My main worry here is that this is happening for a single table, if we have multiple tables being committed, the memory usage will be much larger.

Questions:

1.      Have others observed this behavior? Is the high memory usage expected or am I doing something wrong? Is there any way to reduce the memory footprint (e.g. by changing some metadata config) during the commit?

2.      What is the general recommendation for high concurrent committers? Is high concurrent committers an anti-pattern for Iceberg?

Thanks,
Mayur
Code snippets:

Schema schema = new Schema(
    NestedField.of(1, false, "time", TimestampType.withoutZone()),
    NestedField.of(2, false, "id", IntegerType.get()),
    NestedField.of(3, false, "value", DoubleType.get())
);

catalog.createTable(
    tableIdentifier,
    schema,
    PartitionSpec.builderFor(schema).month("time").build(),
    ImmutableMap.of(
        TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
        TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
        TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
        "write.metadata.previous-versions-max", String.valueOf(1)
    ) // properties
);

// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    appendFilesList.add(table.newAppend());
}

for (int m = 0; m < NT; m++) {
    LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, ChronoUnit.MONTHS);
    ImmutableList<GenericRecord> records = ImmutableList.of(createRecord(schema, time, 1, 10.0));
    writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}

// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    final int i = m;
    futures.add(executors.submit(() -> {
        appendFilesList.get(i).commit();
    }));
}

for (int m = 0; m < N; m++) {
    futures.get(m).get();
}

executors.shutdownNow();

// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord> records)
        throws IOException {
    // PartitionedWriterImpl extends extends PartitionedWriter<Record>
    try (var writer = new PartitionedWriterImpl(table)) {
        for (var record : records) {
            writer.write(record);
        }
        return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
    }
}

Following is the heap usage for one of the experiments where we can see very high heap usage. The initial low usage part is data writes. The high heap usage starts with the commit phase.


--
Ryan Blue
Tabular

Re: High memory usage with highly concurrent committers

Posted by Ryan Blue <bl...@tabular.io>.
Mayur,

Is it possible to connect to this process with a profiler and look at
what's taking up all of the space?

I suspect that what's happening here is that you're loading the list of
snapshots for each version of metadata, so you're holding a lot of copies
of the entire snapshot history and possibly caching the list of manifests
for some snapshots as well.

I've thought about adding a way to avoid parsing and loading snapshots,
probably by passing a cache when loading metadata so that all the copies of
a table can share snapshots in memory. That would work fine because they're
immutable. That might help you here, although a Snapshot instance will
cache manifests after loading them if they are accessed, so you'd want to
watch out for that as well.

The best step forward is to get an idea of what objects are taking up that
space with a profiler or heap dump if you can.

Ryan

On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>

-- 
Ryan Blue
Tabular