You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Thippana Vamsi Kalyan <va...@dremio.com> on 2022/02/19 13:29:27 UTC

Concurrent transactions

Hi,

I am trying to understand the usage of Transactions in Iceberg with
"commit.retry.num-retries" set to zero. My requirement is that the
transaction must fail if the table gets updated by any concurrent
transaction after opening the transaction.

I wrote the following unit test in TestHadoopTables.java to verify the
behaviour. I am noticing that both transactions are committing one after
the other leading to an unexpected table state. Could anyone please confirm
if I am doing anything wrong, or whether Iceberg transaction commit logic
needs any change?

This test is very simple. It opens two transactions one after another, adds
a file as part of the transaction, and commits them one after the other. My
requirement is that the second transaction must fail with
CommitFailedException. But, it is successfully committing.

@Test
  public void testSimpleConcurrentTransaction() {
    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
            .build();

    // set table property to avoid retries during commit
    final Map<String, String> tableProperties = Stream.of(new String[][] {
            { TableProperties.COMMIT_NUM_RETRIES, "0"
}}).collect(Collectors.toMap(d->d[0], d->d[1]));

    final DataFile FILE_A = DataFiles.builder(spec)
            .withPath("/path/to/data-a.parquet")
            .withFileSizeInBytes(10)
            .withRecordCount(1)
            .build();

    Table table = TABLES.create(SCHEMA, spec, tableProperties,
tableDir.toURI().toString());

    // It is an empty table, so there is no snapshot yet
    Assert.assertEquals("Current snapshot must be null", null,
table.currentSnapshot());

    // start transaction t1
    Transaction t1 = table.newTransaction();

    // start transaction t2
    Transaction t2 = table.newTransaction();

    // t1 is adding a data file
    t1.newAppend()
            .appendFile(FILE_A)
            .commit();

    // t2 is adding a data file
    t2.newAppend()
            .appendFile(FILE_A)
            .commit();

    // commit transaction t1
    t1.commitTransaction();

    // commit transaction t2: My requirement is that the following commit
must fail
    t2.commitTransaction();

    table.refresh();
    List<ManifestFile> manifests = table.currentSnapshot().allManifests();

    // Following assert fails since both transaction added one each
manifest file
    Assert.assertEquals("Should have 1 manifest file", 1, manifests.size());
  }

Please suggest whether there is a way to commit transactions such that the
second one fails. Thank you so much.

Re: Concurrent transactions

Posted by Ryan Blue <bl...@tabular.io>.
The problem with that approach is that you're not actually running
validations. It's fine if you want to do it that way in your environment,
this is why we make it easy to plug in your own `TableOperations`. But I
wouldn't recommend it or add that to the Iceberg library.

On Sat, Feb 19, 2022 at 4:35 PM Thippana Vamsi Kalyan <va...@dremio.com>
wrote:

> Thanks Russell and Ryan for clarification on Transaction usage.
>
> What do you think about passing a special table ops that does not refresh
> from catalog unless a commit was called on it?
> So, that way the base in Transaction never moves and during commit a
> conflict will be found, and then because my num-retries is zero, commit
> fails.
>
> Do you see any issues in that approach?
>
> On Sun, Feb 20, 2022 at 2:46 AM Ryan Blue <bl...@tabular.io> wrote:
>
>> The problem that you’re hitting is that the initial commit succeeds. It
>> doesn’t have anything to do with retries. The commit is started when you
>> call commitTransaction not when you create the transaction.
>>
>> That’s because transactions aren’t watching the table state from the
>> transaction start. Instead, requirements for table state are added by the
>> individual operations. For example, the MERGE INTO command will configure
>> the overwrite it uses to validate that there are no changes after the
>> starting snapshot ID that was read. Like Russell said, that’s not done
>> automatically by the transaction because there’s no way for the Transaction
>> to see the table reads.
>>
>> What you probably want to do is to configure validation for the
>> operations that you’re using. Here’s an example from merge
>> <https://github.com/apache/iceberg/blob/578e35c3b8c0c6c8642be38de0f7813dcd95cf68/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L371>
>> that configures validation by:
>>
>>    - Setting the conflict detection filter, which is what was scanned as
>>    part of the operation
>>    - Requesting validation that no data files have been added that match
>>    the filter
>>    - Requesting validation that no delete files (or file deletes) have
>>    been added that match the filter
>>
>> Ryan
>>
>> On Sat, Feb 19, 2022 at 8:45 AM Thippana Vamsi Kalyan <va...@dremio.com>
>> wrote:
>>
>>> you basically get 1 retry for free off the bat,
>>>>
>>>
>>> Yes, I am getting this free retry even though I am setting
>>> "commit.retry.num-retries" to zero
>>>
>>> 2) Creating a transaction that wraps a base transaction and that
>>>> overrides commitTransaction to just fail if the metadata from when the
>>>> object was created is not the same as the metadata refreshed from the
>>>> operations when commit is called. Note: This would also mean that any
>>>> operation (not just other transactions) would fail the transaction.
>>>>
>>>
>>> Yes, my requirement is to let only one write happen to the table. So,
>>> overriding BaseTransaction and doing the transaction commit myself is an
>>> option. I am hoping that I could avoid custom transaction logic in our
>>> application, and contribute an acceptable solution to open source.
>>>
>>> Will it be an acceptable change that honours "commit.retry.num-retries"
>>> across multiple commit calls on the same update?
>>> Or, is there anything better we can do to respect the 'noRetry' setting.
>>> Currently the transaction is not supporting the noRetry case. It always
>>> retries at least once.
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Feb 19, 2022 at 9:05 PM Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>> The issue with the above example is that starting a transaction is not
>>>> a shared state amongst iceberg clients or the table itself. There is
>>>> nothing that other clients could know about or check. If you look at the
>>>> code for committing a transaction, it starts building the transaction off
>>>> of whatever snapshot was available *at commit
>>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>time
>>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>,*.
>>>> This means when you call t2.commitTransaction, it checks to see what the
>>>> latest snapshot is and gets back t1's commit. Now if t2 had attempted to
>>>> commit *concurrently with t1 *then only one of them would have
>>>> succeeded and the other would have failed.
>>>>
>>>> Since we always refresh before trying to build the commit you basically
>>>> get 1 retry for free off the bat, so unless the commits inside the
>>>> transaction are not compatible with the current metadata you are good to go
>>>> without any retries. I think if we changed to logic to attempt to refresh
>>>> only after we fail to commit here
>>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L377> we
>>>> would get the effect you are looking for but I'm not sure this would be
>>>> generically useful since I think most folks would like non-conflicting
>>>> transactions to not require a retry.
>>>>
>>>> So for your use case I think maybe there are 2 solutions:
>>>> 1) Make sure all commits contain some operation that would conflict if
>>>> another transaction succeeded first. This seems like a difficult
>>>> proposition to me
>>>> 2) Creating a transaction that wraps a base transaction and that
>>>> overrides commitTransaction to just fail if the metadata from when the
>>>> object was created is not the same as the metadata refreshed from the
>>>> operations when commit is called. Note: This would also mean that any
>>>> operation (not just other transactions) would fail the transaction.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Feb 19, 2022 at 7:29 AM Thippana Vamsi Kalyan <va...@dremio.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to understand the usage of Transactions in Iceberg with
>>>>> "commit.retry.num-retries" set to zero. My requirement is that the
>>>>> transaction must fail if the table gets updated by any concurrent
>>>>> transaction after opening the transaction.
>>>>>
>>>>> I wrote the following unit test in TestHadoopTables.java to verify the
>>>>> behaviour. I am noticing that both transactions are committing one after
>>>>> the other leading to an unexpected table state. Could anyone please confirm
>>>>> if I am doing anything wrong, or whether Iceberg transaction commit logic
>>>>> needs any change?
>>>>>
>>>>> This test is very simple. It opens two transactions one after another,
>>>>> adds a file as part of the transaction, and commits them one after the
>>>>> other. My requirement is that the second transaction must fail with
>>>>> CommitFailedException. But, it is successfully committing.
>>>>>
>>>>> @Test
>>>>>   public void testSimpleConcurrentTransaction() {
>>>>>     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
>>>>>             .build();
>>>>>
>>>>>     // set table property to avoid retries during commit
>>>>>     final Map<String, String> tableProperties = Stream.of(new
>>>>> String[][] {
>>>>>             { TableProperties.COMMIT_NUM_RETRIES, "0"
>>>>> }}).collect(Collectors.toMap(d->d[0], d->d[1]));
>>>>>
>>>>>     final DataFile FILE_A = DataFiles.builder(spec)
>>>>>             .withPath("/path/to/data-a.parquet")
>>>>>             .withFileSizeInBytes(10)
>>>>>             .withRecordCount(1)
>>>>>             .build();
>>>>>
>>>>>     Table table = TABLES.create(SCHEMA, spec, tableProperties,
>>>>> tableDir.toURI().toString());
>>>>>
>>>>>     // It is an empty table, so there is no snapshot yet
>>>>>     Assert.assertEquals("Current snapshot must be null", null,
>>>>> table.currentSnapshot());
>>>>>
>>>>>     // start transaction t1
>>>>>     Transaction t1 = table.newTransaction();
>>>>>
>>>>>     // start transaction t2
>>>>>     Transaction t2 = table.newTransaction();
>>>>>
>>>>>     // t1 is adding a data file
>>>>>     t1.newAppend()
>>>>>             .appendFile(FILE_A)
>>>>>             .commit();
>>>>>
>>>>>     // t2 is adding a data file
>>>>>     t2.newAppend()
>>>>>             .appendFile(FILE_A)
>>>>>             .commit();
>>>>>
>>>>>     // commit transaction t1
>>>>>     t1.commitTransaction();
>>>>>
>>>>>     // commit transaction t2: My requirement is that the following
>>>>> commit must fail
>>>>>     t2.commitTransaction();
>>>>>
>>>>>     table.refresh();
>>>>>     List<ManifestFile> manifests =
>>>>> table.currentSnapshot().allManifests();
>>>>>
>>>>>     // Following assert fails since both transaction added one each
>>>>> manifest file
>>>>>     Assert.assertEquals("Should have 1 manifest file", 1,
>>>>> manifests.size());
>>>>>   }
>>>>>
>>>>> Please suggest whether there is a way to commit transactions such that
>>>>> the second one fails. Thank you so much.
>>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

-- 
Ryan Blue
Tabular

Re: Concurrent transactions

Posted by Thippana Vamsi Kalyan <va...@dremio.com>.
Thanks Russell and Ryan for clarification on Transaction usage.

What do you think about passing a special table ops that does not refresh
from catalog unless a commit was called on it?
So, that way the base in Transaction never moves and during commit a
conflict will be found, and then because my num-retries is zero, commit
fails.

Do you see any issues in that approach?

On Sun, Feb 20, 2022 at 2:46 AM Ryan Blue <bl...@tabular.io> wrote:

> The problem that you’re hitting is that the initial commit succeeds. It
> doesn’t have anything to do with retries. The commit is started when you
> call commitTransaction not when you create the transaction.
>
> That’s because transactions aren’t watching the table state from the
> transaction start. Instead, requirements for table state are added by the
> individual operations. For example, the MERGE INTO command will configure
> the overwrite it uses to validate that there are no changes after the
> starting snapshot ID that was read. Like Russell said, that’s not done
> automatically by the transaction because there’s no way for the Transaction
> to see the table reads.
>
> What you probably want to do is to configure validation for the operations
> that you’re using. Here’s an example from merge
> <https://github.com/apache/iceberg/blob/578e35c3b8c0c6c8642be38de0f7813dcd95cf68/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L371>
> that configures validation by:
>
>    - Setting the conflict detection filter, which is what was scanned as
>    part of the operation
>    - Requesting validation that no data files have been added that match
>    the filter
>    - Requesting validation that no delete files (or file deletes) have
>    been added that match the filter
>
> Ryan
>
> On Sat, Feb 19, 2022 at 8:45 AM Thippana Vamsi Kalyan <va...@dremio.com>
> wrote:
>
>> you basically get 1 retry for free off the bat,
>>>
>>
>> Yes, I am getting this free retry even though I am setting
>> "commit.retry.num-retries" to zero
>>
>> 2) Creating a transaction that wraps a base transaction and that
>>> overrides commitTransaction to just fail if the metadata from when the
>>> object was created is not the same as the metadata refreshed from the
>>> operations when commit is called. Note: This would also mean that any
>>> operation (not just other transactions) would fail the transaction.
>>>
>>
>> Yes, my requirement is to let only one write happen to the table. So,
>> overriding BaseTransaction and doing the transaction commit myself is an
>> option. I am hoping that I could avoid custom transaction logic in our
>> application, and contribute an acceptable solution to open source.
>>
>> Will it be an acceptable change that honours "commit.retry.num-retries"
>> across multiple commit calls on the same update?
>> Or, is there anything better we can do to respect the 'noRetry' setting.
>> Currently the transaction is not supporting the noRetry case. It always
>> retries at least once.
>>
>>
>>
>>
>>
>> On Sat, Feb 19, 2022 at 9:05 PM Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>> The issue with the above example is that starting a transaction is not a
>>> shared state amongst iceberg clients or the table itself. There is nothing
>>> that other clients could know about or check. If you look at the code for
>>> committing a transaction, it starts building the transaction off of
>>> whatever snapshot was available *at commit
>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>time
>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>,*.
>>> This means when you call t2.commitTransaction, it checks to see what the
>>> latest snapshot is and gets back t1's commit. Now if t2 had attempted to
>>> commit *concurrently with t1 *then only one of them would have
>>> succeeded and the other would have failed.
>>>
>>> Since we always refresh before trying to build the commit you basically
>>> get 1 retry for free off the bat, so unless the commits inside the
>>> transaction are not compatible with the current metadata you are good to go
>>> without any retries. I think if we changed to logic to attempt to refresh
>>> only after we fail to commit here
>>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L377> we
>>> would get the effect you are looking for but I'm not sure this would be
>>> generically useful since I think most folks would like non-conflicting
>>> transactions to not require a retry.
>>>
>>> So for your use case I think maybe there are 2 solutions:
>>> 1) Make sure all commits contain some operation that would conflict if
>>> another transaction succeeded first. This seems like a difficult
>>> proposition to me
>>> 2) Creating a transaction that wraps a base transaction and that
>>> overrides commitTransaction to just fail if the metadata from when the
>>> object was created is not the same as the metadata refreshed from the
>>> operations when commit is called. Note: This would also mean that any
>>> operation (not just other transactions) would fail the transaction.
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Feb 19, 2022 at 7:29 AM Thippana Vamsi Kalyan <va...@dremio.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to understand the usage of Transactions in Iceberg with
>>>> "commit.retry.num-retries" set to zero. My requirement is that the
>>>> transaction must fail if the table gets updated by any concurrent
>>>> transaction after opening the transaction.
>>>>
>>>> I wrote the following unit test in TestHadoopTables.java to verify the
>>>> behaviour. I am noticing that both transactions are committing one after
>>>> the other leading to an unexpected table state. Could anyone please confirm
>>>> if I am doing anything wrong, or whether Iceberg transaction commit logic
>>>> needs any change?
>>>>
>>>> This test is very simple. It opens two transactions one after another,
>>>> adds a file as part of the transaction, and commits them one after the
>>>> other. My requirement is that the second transaction must fail with
>>>> CommitFailedException. But, it is successfully committing.
>>>>
>>>> @Test
>>>>   public void testSimpleConcurrentTransaction() {
>>>>     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
>>>>             .build();
>>>>
>>>>     // set table property to avoid retries during commit
>>>>     final Map<String, String> tableProperties = Stream.of(new
>>>> String[][] {
>>>>             { TableProperties.COMMIT_NUM_RETRIES, "0"
>>>> }}).collect(Collectors.toMap(d->d[0], d->d[1]));
>>>>
>>>>     final DataFile FILE_A = DataFiles.builder(spec)
>>>>             .withPath("/path/to/data-a.parquet")
>>>>             .withFileSizeInBytes(10)
>>>>             .withRecordCount(1)
>>>>             .build();
>>>>
>>>>     Table table = TABLES.create(SCHEMA, spec, tableProperties,
>>>> tableDir.toURI().toString());
>>>>
>>>>     // It is an empty table, so there is no snapshot yet
>>>>     Assert.assertEquals("Current snapshot must be null", null,
>>>> table.currentSnapshot());
>>>>
>>>>     // start transaction t1
>>>>     Transaction t1 = table.newTransaction();
>>>>
>>>>     // start transaction t2
>>>>     Transaction t2 = table.newTransaction();
>>>>
>>>>     // t1 is adding a data file
>>>>     t1.newAppend()
>>>>             .appendFile(FILE_A)
>>>>             .commit();
>>>>
>>>>     // t2 is adding a data file
>>>>     t2.newAppend()
>>>>             .appendFile(FILE_A)
>>>>             .commit();
>>>>
>>>>     // commit transaction t1
>>>>     t1.commitTransaction();
>>>>
>>>>     // commit transaction t2: My requirement is that the following
>>>> commit must fail
>>>>     t2.commitTransaction();
>>>>
>>>>     table.refresh();
>>>>     List<ManifestFile> manifests =
>>>> table.currentSnapshot().allManifests();
>>>>
>>>>     // Following assert fails since both transaction added one each
>>>> manifest file
>>>>     Assert.assertEquals("Should have 1 manifest file", 1,
>>>> manifests.size());
>>>>   }
>>>>
>>>> Please suggest whether there is a way to commit transactions such that
>>>> the second one fails. Thank you so much.
>>>>
>>>
>
> --
> Ryan Blue
> Tabular
>

Re: Concurrent transactions

Posted by Ryan Blue <bl...@tabular.io>.
The problem that you’re hitting is that the initial commit succeeds. It
doesn’t have anything to do with retries. The commit is started when you
call commitTransaction not when you create the transaction.

That’s because transactions aren’t watching the table state from the
transaction start. Instead, requirements for table state are added by the
individual operations. For example, the MERGE INTO command will configure
the overwrite it uses to validate that there are no changes after the
starting snapshot ID that was read. Like Russell said, that’s not done
automatically by the transaction because there’s no way for the Transaction
to see the table reads.

What you probably want to do is to configure validation for the operations
that you’re using. Here’s an example from merge
<https://github.com/apache/iceberg/blob/578e35c3b8c0c6c8642be38de0f7813dcd95cf68/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L371>
that configures validation by:

   - Setting the conflict detection filter, which is what was scanned as
   part of the operation
   - Requesting validation that no data files have been added that match
   the filter
   - Requesting validation that no delete files (or file deletes) have been
   added that match the filter

Ryan

On Sat, Feb 19, 2022 at 8:45 AM Thippana Vamsi Kalyan <va...@dremio.com>
wrote:

> you basically get 1 retry for free off the bat,
>>
>
> Yes, I am getting this free retry even though I am setting
> "commit.retry.num-retries" to zero
>
> 2) Creating a transaction that wraps a base transaction and that overrides
>> commitTransaction to just fail if the metadata from when the object was
>> created is not the same as the metadata refreshed from the operations when
>> commit is called. Note: This would also mean that any operation (not just
>> other transactions) would fail the transaction.
>>
>
> Yes, my requirement is to let only one write happen to the table. So,
> overriding BaseTransaction and doing the transaction commit myself is an
> option. I am hoping that I could avoid custom transaction logic in our
> application, and contribute an acceptable solution to open source.
>
> Will it be an acceptable change that honours "commit.retry.num-retries"
> across multiple commit calls on the same update?
> Or, is there anything better we can do to respect the 'noRetry' setting.
> Currently the transaction is not supporting the noRetry case. It always
> retries at least once.
>
>
>
>
>
> On Sat, Feb 19, 2022 at 9:05 PM Russell Spitzer <ru...@gmail.com>
> wrote:
>
>> The issue with the above example is that starting a transaction is not a
>> shared state amongst iceberg clients or the table itself. There is nothing
>> that other clients could know about or check. If you look at the code for
>> committing a transaction, it starts building the transaction off of
>> whatever snapshot was available *at commit
>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>time
>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>,*.
>> This means when you call t2.commitTransaction, it checks to see what the
>> latest snapshot is and gets back t1's commit. Now if t2 had attempted to
>> commit *concurrently with t1 *then only one of them would have succeeded
>> and the other would have failed.
>>
>> Since we always refresh before trying to build the commit you basically
>> get 1 retry for free off the bat, so unless the commits inside the
>> transaction are not compatible with the current metadata you are good to go
>> without any retries. I think if we changed to logic to attempt to refresh
>> only after we fail to commit here
>> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L377> we
>> would get the effect you are looking for but I'm not sure this would be
>> generically useful since I think most folks would like non-conflicting
>> transactions to not require a retry.
>>
>> So for your use case I think maybe there are 2 solutions:
>> 1) Make sure all commits contain some operation that would conflict if
>> another transaction succeeded first. This seems like a difficult
>> proposition to me
>> 2) Creating a transaction that wraps a base transaction and that
>> overrides commitTransaction to just fail if the metadata from when the
>> object was created is not the same as the metadata refreshed from the
>> operations when commit is called. Note: This would also mean that any
>> operation (not just other transactions) would fail the transaction.
>>
>>
>>
>>
>>
>> On Sat, Feb 19, 2022 at 7:29 AM Thippana Vamsi Kalyan <va...@dremio.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to understand the usage of Transactions in Iceberg with
>>> "commit.retry.num-retries" set to zero. My requirement is that the
>>> transaction must fail if the table gets updated by any concurrent
>>> transaction after opening the transaction.
>>>
>>> I wrote the following unit test in TestHadoopTables.java to verify the
>>> behaviour. I am noticing that both transactions are committing one after
>>> the other leading to an unexpected table state. Could anyone please confirm
>>> if I am doing anything wrong, or whether Iceberg transaction commit logic
>>> needs any change?
>>>
>>> This test is very simple. It opens two transactions one after another,
>>> adds a file as part of the transaction, and commits them one after the
>>> other. My requirement is that the second transaction must fail with
>>> CommitFailedException. But, it is successfully committing.
>>>
>>> @Test
>>>   public void testSimpleConcurrentTransaction() {
>>>     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
>>>             .build();
>>>
>>>     // set table property to avoid retries during commit
>>>     final Map<String, String> tableProperties = Stream.of(new String[][]
>>> {
>>>             { TableProperties.COMMIT_NUM_RETRIES, "0"
>>> }}).collect(Collectors.toMap(d->d[0], d->d[1]));
>>>
>>>     final DataFile FILE_A = DataFiles.builder(spec)
>>>             .withPath("/path/to/data-a.parquet")
>>>             .withFileSizeInBytes(10)
>>>             .withRecordCount(1)
>>>             .build();
>>>
>>>     Table table = TABLES.create(SCHEMA, spec, tableProperties,
>>> tableDir.toURI().toString());
>>>
>>>     // It is an empty table, so there is no snapshot yet
>>>     Assert.assertEquals("Current snapshot must be null", null,
>>> table.currentSnapshot());
>>>
>>>     // start transaction t1
>>>     Transaction t1 = table.newTransaction();
>>>
>>>     // start transaction t2
>>>     Transaction t2 = table.newTransaction();
>>>
>>>     // t1 is adding a data file
>>>     t1.newAppend()
>>>             .appendFile(FILE_A)
>>>             .commit();
>>>
>>>     // t2 is adding a data file
>>>     t2.newAppend()
>>>             .appendFile(FILE_A)
>>>             .commit();
>>>
>>>     // commit transaction t1
>>>     t1.commitTransaction();
>>>
>>>     // commit transaction t2: My requirement is that the following
>>> commit must fail
>>>     t2.commitTransaction();
>>>
>>>     table.refresh();
>>>     List<ManifestFile> manifests =
>>> table.currentSnapshot().allManifests();
>>>
>>>     // Following assert fails since both transaction added one each
>>> manifest file
>>>     Assert.assertEquals("Should have 1 manifest file", 1,
>>> manifests.size());
>>>   }
>>>
>>> Please suggest whether there is a way to commit transactions such that
>>> the second one fails. Thank you so much.
>>>
>>

-- 
Ryan Blue
Tabular

Re: Concurrent transactions

Posted by Thippana Vamsi Kalyan <va...@dremio.com>.
>
> you basically get 1 retry for free off the bat,
>

Yes, I am getting this free retry even though I am setting
"commit.retry.num-retries" to zero

2) Creating a transaction that wraps a base transaction and that overrides
> commitTransaction to just fail if the metadata from when the object was
> created is not the same as the metadata refreshed from the operations when
> commit is called. Note: This would also mean that any operation (not just
> other transactions) would fail the transaction.
>

Yes, my requirement is to let only one write happen to the table. So,
overriding BaseTransaction and doing the transaction commit myself is an
option. I am hoping that I could avoid custom transaction logic in our
application, and contribute an acceptable solution to open source.

Will it be an acceptable change that honours "commit.retry.num-retries"
across multiple commit calls on the same update?
Or, is there anything better we can do to respect the 'noRetry' setting.
Currently the transaction is not supporting the noRetry case. It always
retries at least once.





On Sat, Feb 19, 2022 at 9:05 PM Russell Spitzer <ru...@gmail.com>
wrote:

> The issue with the above example is that starting a transaction is not a
> shared state amongst iceberg clients or the table itself. There is nothing
> that other clients could know about or check. If you look at the code for
> committing a transaction, it starts building the transaction off of
> whatever snapshot was available *at commit
> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>time
> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>,*.
> This means when you call t2.commitTransaction, it checks to see what the
> latest snapshot is and gets back t1's commit. Now if t2 had attempted to
> commit *concurrently with t1 *then only one of them would have succeeded
> and the other would have failed.
>
> Since we always refresh before trying to build the commit you basically
> get 1 retry for free off the bat, so unless the commits inside the
> transaction are not compatible with the current metadata you are good to go
> without any retries. I think if we changed to logic to attempt to refresh
> only after we fail to commit here
> <https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L377> we
> would get the effect you are looking for but I'm not sure this would be
> generically useful since I think most folks would like non-conflicting
> transactions to not require a retry.
>
> So for your use case I think maybe there are 2 solutions:
> 1) Make sure all commits contain some operation that would conflict if
> another transaction succeeded first. This seems like a difficult
> proposition to me
> 2) Creating a transaction that wraps a base transaction and that overrides
> commitTransaction to just fail if the metadata from when the object was
> created is not the same as the metadata refreshed from the operations when
> commit is called. Note: This would also mean that any operation (not just
> other transactions) would fail the transaction.
>
>
>
>
>
> On Sat, Feb 19, 2022 at 7:29 AM Thippana Vamsi Kalyan <va...@dremio.com>
> wrote:
>
>> Hi,
>>
>> I am trying to understand the usage of Transactions in Iceberg with
>> "commit.retry.num-retries" set to zero. My requirement is that the
>> transaction must fail if the table gets updated by any concurrent
>> transaction after opening the transaction.
>>
>> I wrote the following unit test in TestHadoopTables.java to verify the
>> behaviour. I am noticing that both transactions are committing one after
>> the other leading to an unexpected table state. Could anyone please confirm
>> if I am doing anything wrong, or whether Iceberg transaction commit logic
>> needs any change?
>>
>> This test is very simple. It opens two transactions one after another,
>> adds a file as part of the transaction, and commits them one after the
>> other. My requirement is that the second transaction must fail with
>> CommitFailedException. But, it is successfully committing.
>>
>> @Test
>>   public void testSimpleConcurrentTransaction() {
>>     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
>>             .build();
>>
>>     // set table property to avoid retries during commit
>>     final Map<String, String> tableProperties = Stream.of(new String[][] {
>>             { TableProperties.COMMIT_NUM_RETRIES, "0"
>> }}).collect(Collectors.toMap(d->d[0], d->d[1]));
>>
>>     final DataFile FILE_A = DataFiles.builder(spec)
>>             .withPath("/path/to/data-a.parquet")
>>             .withFileSizeInBytes(10)
>>             .withRecordCount(1)
>>             .build();
>>
>>     Table table = TABLES.create(SCHEMA, spec, tableProperties,
>> tableDir.toURI().toString());
>>
>>     // It is an empty table, so there is no snapshot yet
>>     Assert.assertEquals("Current snapshot must be null", null,
>> table.currentSnapshot());
>>
>>     // start transaction t1
>>     Transaction t1 = table.newTransaction();
>>
>>     // start transaction t2
>>     Transaction t2 = table.newTransaction();
>>
>>     // t1 is adding a data file
>>     t1.newAppend()
>>             .appendFile(FILE_A)
>>             .commit();
>>
>>     // t2 is adding a data file
>>     t2.newAppend()
>>             .appendFile(FILE_A)
>>             .commit();
>>
>>     // commit transaction t1
>>     t1.commitTransaction();
>>
>>     // commit transaction t2: My requirement is that the following commit
>> must fail
>>     t2.commitTransaction();
>>
>>     table.refresh();
>>     List<ManifestFile> manifests = table.currentSnapshot().allManifests();
>>
>>     // Following assert fails since both transaction added one each
>> manifest file
>>     Assert.assertEquals("Should have 1 manifest file", 1,
>> manifests.size());
>>   }
>>
>> Please suggest whether there is a way to commit transactions such that
>> the second one fails. Thank you so much.
>>
>

Re: Concurrent transactions

Posted by Russell Spitzer <ru...@gmail.com>.
The issue with the above example is that starting a transaction is not a
shared state amongst iceberg clients or the table itself. There is nothing
that other clients could know about or check. If you look at the code for
committing a transaction, it starts building the transaction off of
whatever snapshot was available *at commit
<https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>time
<https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L363-L364>,*.
This means when you call t2.commitTransaction, it checks to see what the
latest snapshot is and gets back t1's commit. Now if t2 had attempted to
commit *concurrently with t1 *then only one of them would have succeeded
and the other would have failed.

Since we always refresh before trying to build the commit you basically get
1 retry for free off the bat, so unless the commits inside the transaction
are not compatible with the current metadata you are good to go without any
retries. I think if we changed to logic to attempt to refresh only after we
fail to commit here
<https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L377>
we
would get the effect you are looking for but I'm not sure this would be
generically useful since I think most folks would like non-conflicting
transactions to not require a retry.

So for your use case I think maybe there are 2 solutions:
1) Make sure all commits contain some operation that would conflict if
another transaction succeeded first. This seems like a difficult
proposition to me
2) Creating a transaction that wraps a base transaction and that overrides
commitTransaction to just fail if the metadata from when the object was
created is not the same as the metadata refreshed from the operations when
commit is called. Note: This would also mean that any operation (not just
other transactions) would fail the transaction.





On Sat, Feb 19, 2022 at 7:29 AM Thippana Vamsi Kalyan <va...@dremio.com>
wrote:

> Hi,
>
> I am trying to understand the usage of Transactions in Iceberg with
> "commit.retry.num-retries" set to zero. My requirement is that the
> transaction must fail if the table gets updated by any concurrent
> transaction after opening the transaction.
>
> I wrote the following unit test in TestHadoopTables.java to verify the
> behaviour. I am noticing that both transactions are committing one after
> the other leading to an unexpected table state. Could anyone please confirm
> if I am doing anything wrong, or whether Iceberg transaction commit logic
> needs any change?
>
> This test is very simple. It opens two transactions one after another,
> adds a file as part of the transaction, and commits them one after the
> other. My requirement is that the second transaction must fail with
> CommitFailedException. But, it is successfully committing.
>
> @Test
>   public void testSimpleConcurrentTransaction() {
>     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
>             .build();
>
>     // set table property to avoid retries during commit
>     final Map<String, String> tableProperties = Stream.of(new String[][] {
>             { TableProperties.COMMIT_NUM_RETRIES, "0"
> }}).collect(Collectors.toMap(d->d[0], d->d[1]));
>
>     final DataFile FILE_A = DataFiles.builder(spec)
>             .withPath("/path/to/data-a.parquet")
>             .withFileSizeInBytes(10)
>             .withRecordCount(1)
>             .build();
>
>     Table table = TABLES.create(SCHEMA, spec, tableProperties,
> tableDir.toURI().toString());
>
>     // It is an empty table, so there is no snapshot yet
>     Assert.assertEquals("Current snapshot must be null", null,
> table.currentSnapshot());
>
>     // start transaction t1
>     Transaction t1 = table.newTransaction();
>
>     // start transaction t2
>     Transaction t2 = table.newTransaction();
>
>     // t1 is adding a data file
>     t1.newAppend()
>             .appendFile(FILE_A)
>             .commit();
>
>     // t2 is adding a data file
>     t2.newAppend()
>             .appendFile(FILE_A)
>             .commit();
>
>     // commit transaction t1
>     t1.commitTransaction();
>
>     // commit transaction t2: My requirement is that the following commit
> must fail
>     t2.commitTransaction();
>
>     table.refresh();
>     List<ManifestFile> manifests = table.currentSnapshot().allManifests();
>
>     // Following assert fails since both transaction added one each
> manifest file
>     Assert.assertEquals("Should have 1 manifest file", 1,
> manifests.size());
>   }
>
> Please suggest whether there is a way to commit transactions such that the
> second one fails. Thank you so much.
>