You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/02 16:11:33 UTC

[GitHub] [iceberg] fqaiser94 opened a new issue, #6514: Add Conditional Transaction Commits

fqaiser94 opened a new issue, #6514:
URL: https://github.com/apache/iceberg/issues/6514

   ### Feature Request / Improvement
   
   TLDR: I'd like to propose adding a new `void commitTransaction(CommitCondition commitCondition)` method to the `Transaction` interface so users can specify the conditions under which a transaction can be safely committed or not, given that other transactions may have changed the table concurrently. This will enable use-cases such as monotonically increasing watermarks in table properties. 
   
   # General Problem
   
   I want to start by describing the challenge I currently face using the existing `Transaction.commitTransaction()` API.
   
   Consider the following example situation: 
   - We have an iceberg table and we are maintaining a custom watermark in the table properties. 
     The expectation is that every time we append new data to the iceberg table, the custom watermark should be incremented. 
     Let's say the current value of this `"custom_watermark"` is `"0"` in our iceberg table's properties. 
   - Now let's say we have two transactions running concurrently against this Iceberg table. 
     Both transactions are appending a different datafile to the iceberg table: 
     `txn.newAppend().appendFile(dataFile).commit()`
     As part of each transactions, we are also advancing the `"custom_watermark"` table property to the next value which can be easily calculated by pulling the latest value from the existing table properties and adding one i.e. `"1"`: 
     `txn.updateProperties().set("custom_watermark", "1").commit()`
   - What happens next is that one of the transactions will "win" and be committed to the table first. 
     We have new data in the table and the custom watermark has been correctly incremented from `"0"` to `"1"`. So far so good.
   - The "loser" transaction will fail with `CommitFailedException` and be automatically retried (assuming `TableProperties.COMMIT_NUM_RETRIES >= 1`). The retry mechanism is where things get interesting.
   - First, iceberg will refresh it's view of the table. Internally this is done by calling `TableOperations.refresh()` which returns an up-to-date `TableMetadata`). 
   - Then iceberg will re-apply the updates in the "losing" transaction on top of the refreshed `TableMetadata` i.e. 
     It will again attempt to append data to the table and set the custom watermark table property to `"1"`.
     You can already see the problem here: our custom watermark isn't being incremented correctly! 
   - Unfortunately, all Iceberg sees is a set of updates (from the "losing" transaction) that as far as it knows do not conflict in any way that it cares (with the changes from the "winning" transaction), so it will happily attempt to commit the updated `TableMetadata` a second time and succeed in doing so. 
     Now we are in a "bad" state: although the table has new data, the custom watermark has not been advanced. 
   
   This scenario can demonstrated in code like so: 
   
   ```
   @Test  
   public void watermarkIsNotIncrementedCorrectlyWithConcurrentTransactionCommits() throws Exception {  
     String customWatermarkKey = "custom_watermark";  
     
     table.updateProperties().set(customWatermarkKey, "0").commit();  
     Integer initialExpectedVersion = 1;  
     Assert.assertEquals("Table should be on version 1", initialExpectedVersion, version());  
     Assert.assertEquals(  
       "Initial custom watermark value is",  
       "0",  
       table.properties().get(customWatermarkKey));  
     
     Supplier<String> nextWatermarkValue = () ->  
       Optional.ofNullable(table.properties().get(customWatermarkKey))  
       .map(Integer::parseInt)  
       .map(x -> x + 1)  
       .map(String::valueOf)  
       .orElse("0");  
     
     Function<DataFile, Thread> makeThread = (dataFile) -> {  
       Transaction txn = table.newTransaction();  
       txn.newAppend().appendFile(dataFile).commit();  
       txn.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  
       return new Thread(txn::commitTransaction);  
     };  
     
     Thread thread1 = makeThread.apply(FILE_A);  
     Thread thread2 = makeThread.apply(FILE_B);  
     
     thread1.start();  
     thread2.start();  
     
     thread1.join();  
     thread2.join();  
     
     Assert.assertEquals(  
       "Table should be on two versions ahead as two transactions have been committed successfully",  
       initialExpectedVersion + 2,  
       (int) version());  
     Assert.assertEquals(  
       "We want custom_watermark to also be incremented twice but in fact it appears to have been incremented only once",  
       "1",  
       table.properties().get(customWatermarkKey));  
   }
   ```
   
   You might think at this point a simple solution to this problem is to simply set  `TableProperties.COMMIT_NUM_RETRIES = 0`. 
   Setting this property ensures iceberg will just throw a `CommitFailedException`, instead pf retrying "losing" transactions and putting us in a bad state. In that sense, this is an improvement. 
   Unfortunately, this only works for the specific sequence of events described above, and is not a general solution. 
   This is because before Iceberg even attempts to perform the atomic commit operation the first time (i.e. not on a retry attempt), it will first check whether the `TableMetadata` is up-to-date and if not it will refresh the `TableMetadata` before applying the updates and attempting the commit. 
   Put another way, the automatically-refresh-`TableMetadata` behaviour is not a part of the retry mechanism and so you can still hit this problem even without any retries. This situation can also be reproduced in code, as follows: 
   
   ```
   @Test  
   public void removingRetriesIsNotAGeneralSolution() throws Exception {  
     String customWatermarkKey = "custom_watermark";  
     
     table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "0").set(customWatermarkKey, "0").commit();  
     Integer initialExpectedVersion = 1;  
     Assert.assertEquals("Table should be on version 1", initialExpectedVersion, version());  
     Assert.assertEquals("Initial custom watermark value is", "0", table.properties().get(customWatermarkKey));  
     
     Supplier<String> nextWatermarkValue = () ->  
       Optional.ofNullable(table.properties().get(customWatermarkKey))  
       .map(Integer::parseInt)  
       .map(x -> x + 1)  
       .map(String::valueOf)  
       .orElse("0");  
     
     Transaction txn1 = table.newTransaction();  
     txn1.newAppend().appendFile(FILE_A).commit();  
     txn1.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  
     
     // concurrent transaction which is committed before the first transaction ever calls .commit  
     Transaction txn2 = table.newTransaction();  
     txn2.newAppend().appendFile(FILE_B).commit();  
     txn2.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  
     txn2.commitTransaction();  
     Assert.assertEquals("Table should be on next version", initialExpectedVersion + 1, (int) version());  
     Assert.assertEquals("Table watermark is incremented to 1", "1", table.properties().get(customWatermarkKey));  
     
     txn1.commitTransaction();  
     Assert.assertEquals("Table should be on next version", initialExpectedVersion + 2, (int) version());  
     Assert.assertEquals("Table watermark has seemingly not been incremented", "1", table.properties().get(customWatermarkKey));  
   }
   ```
   
   If anyone has easy ideas for solving this issue, I would love to hear it. 
   Otherwise, please read on for the solution I'm proposing. 
   
   # Proposed Solution
   
   One way to view this problem is as a case of missing information. 
   While Iceberg does perform some validation checks internally to ensure updates don't conflict (a `ValidationException` is thrown in these cases), these obviously can't cover use-case specific conditions such as custom watermarks. 
   The only way iceberg could know about these is if iceberg is told. 
   Hence I'm proposing we expose an API that allows users to give iceberg this information. 
   
   To me, it made the most sense to add this as an overloaded `commitTransaction` method to the existing `Transaction` interface. 
   
   ```
   interface CommitCondition {  
       boolean check(Table baseReadOnlyTable, Table newReadOnlyTable);  
   }
   
   interface Transaction {
       ... existing methods ...
       void commitTransaction(CommitCondition commitCondition);
   }
   ```
   
   I think I have a working proof-of-concept of this idea as a pull request here: https://github.com/apache/iceberg/pull/6513
   There's plenty of design decisions remaining to be discussed here: 
   - Does it make sense to also add an overloaded `void commit(CommitCondition commitCondition)` on the `PendingUpdate` interface as well so non-`Transaction` API users can also take advantage of conditional commits? 
   - What should the `CommitCondition` interface look like? 
   	- Should we expose the new state of the table that we're attempting to commit? 
   	  Or just the state of the base table (i.e. the `Table` changes were based on top of)? 
   	- Should users be allowed to throw their own exceptions inside `CommitCondition.check`? 
   	  Or should `CommitCondition.check` just return a `boolean`?
   
   Some of these are implementation details but I just want to make clear that I haven't figured all of this out yet. 
   If I can get buy-in that this is an actual problem worth solving and if the general approach in the PR makes sense, I would be happy to figure out the remaining details to take this draft pull request to the finish line. 
   
   # Specific Usecase: Committer Fencing to enable Exactly-Once Commits
   
   I've tried to describe the problem and solution above in as general a fashion as possible because I think this API could be used to enable many and varied use-cases beyond just custom watermarks. 
   It might be helpful to understand the specific use-case I'm trying to solve for. 
   I have a "datafile-committer" application which does the following: 
   - Reads messages from a kafka topic
   - Extracts the Datafile from each message 
   - Append-commits the Datafile to the appropriate iceberg table
   
   The challenge for us is that we would like to execute this logic in an exactly-once fashion. 
   Unfortunately, "datafile-committer" is a distributed kafka consumer application, and as a result it's possible for multiple instances of the application to handle the same message/Datafile occassionally in exceptional scenarios such as: 
   - When there's a lot of rebalacing going on or
   - When an application instance becomes disconnected i.e. a zombie process 
   
   Currently, in these exceptional scenarios, we can end up append-committing the same Datafile to an iceberg table multiple times (once for each instance that is handling the same message/Datafile). 
   Since each Datafile can contain hundreds of thousands of records, the resulting iceberg tables can have a very high number of duplicated records. 
   This is obviously bad. 
   
   While it is possible to minimize how often these events happen, it is nearly impossible to guarantee that they will never happen. 
   However, since Kafka messages are associated with monotonically increasing offsets, it's possible to include these as a sort-of `custom_watermark` in the iceberg table properties that can be referenced at commit time to ensure that we always commit a Datafile that has an offset greater than the last committed offset to the iceberg table via a `CommitCondition`. 
   In this way, we could achieve effectively once guarantees (actually, there would be a little more logic needed to fence out zombie committers and get the desired exactly once guarantees but this is just more logic in the `CommitCondition`). 
   
   Hopefully that helps explain where I'm coming from. 
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fqaiser94 commented on issue #6514: Support Conditional Transaction Commits

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1420971731

   Created a PR for this here: https://github.com/apache/iceberg/pull/6513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fqaiser94 commented on issue #6514: Support Conditional Transaction Commits

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1374910722

   Thanks for your response @rdblue!
   
   > What I recommend to accomplish the use case you're talking about is putting the watermark in snapshot properties instead of table properties. 
   
   Thanks for your advice. 
   It might be an option for us but I'll have to think some more about it. 
   I had some concerns about losing those watermarks due to snapshot expiration.
   Was also mildly concerned about how expensive it might be to have to search previous snapshots on every commit (assuming that involves multiple network calls). 
   Regardless, have to get this feature in first though!
   
   > I'd add a `validate(Predicate<TableMetadata> current)` to either `SnapshotUpdate`, or the more general `PendingUpdate`.
   
   Yea, I was considering this too. 
   
   Sounds like we are both more or less in alignment so I'll try reworking my PR to implement it in the way you suggested, might take me a few weeks so please bear with me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #6514: Support Conditional Transaction Commits

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1369254153

   @fqaiser94, in general I think this is a good idea, but I'm not sure there are very many use cases for it besides deduplicating high-level operations. I also think that using this as a way to make table properties transactional is probably a bad idea, but it's been requested in the past so we should probably have an approved way to accomplish it.
   
   Table properties purposely don't have transactional guarantees, to avoid using them to coordinate state. Table properties are supposed to be used to configure the table, not to hold important state. What I recommend to accomplish the use case you're talking about is putting the watermark in snapshot properties instead of table properties. That's what we do for Flink commits and we get exactly-once behavior, although the check for the watermark is done outside of the commit path. Concurrent Flink writes would use different watermark properties because they use watermarks that are job-specific.
   
   It's a good idea to provide a custom validation that can do any check you want. For example, your Kafka example could create watermarks based on some chunk of time that is being processed and the custom validation could check the last few snapshots to see whether another process has already committed. That's a good use case.
   
   To do this, I'd probably take a slightly different approach than the one you've implemented. I'd add a `validate(Predicate<TableMetadata> current)` to either `SnapshotUpdate`, or the more general `PendingUpdate`. That way each table operation can have its own custom validation against the current table state. Using a transaction would automatically check all of the custom validations for each operation, so there would be no need to alter `Transaction`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mahdibh commented on issue #6514: Support Conditional Transaction Commits

Posted by "mahdibh (via GitHub)" <gi...@apache.org>.
mahdibh commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1593653436

   I think one way to approach this problem (which we are also running into) is to expose a new commit API that doesn't automatically refresh the table metadata. This would allow clients to rely on the atomic snapshot CAS to also synchronize the state changes to custom properties (whether it's on the table or the snapshot). This is a very desirable capability since it allows us to update in one single atomic operation both the current snapshot and the properties of that snapshot.
   
   In our particular case, we would like to store some kafka offsets as part of the snapshot properties. Even if we had a single writer, there is always a case where we may end up with two writer (for example due to a network partition) and in that case, we don't want the last writer's state to overwrite the first one. We would like to use the CAS nature of iceberg as the last resort to detect concurrent writes.
   
   Another way to think about this is that each iceberg committer is moving the state of a table from `state-A` to `state-B`. Most clients do not care about what `state-A` is. All they care about is that it gets moved to `state-B`. So if another client gets in between and moves `state-A` to `state-C`, the first client will happily commit by doing the the implicit refresh (which will pick up `state-C`) and then doing the commit effectively moving the system from `state-C` to `state-B`.
   
   There are cases where clients want to be explicit about that state transition. For example, because there are some attributes in `state-A` that they use to determine what `state-B` looks like (ie, the watermark example in the original description or kafka offsets in our case).
   
   Another argument for not doing the refresh is that it adds extra latency (due to the retrieval of the current snapshot). In the single writer case, it is not needed since the common case is that there would be no conflicts and the writer always has the latest snapshot.
   
   I think providing the caller the option to decide on whether they care about the original state or not when they do a commit is a very powerful capability. It also seems like a rather straightforward change that can be applied in a backward compatible way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fqaiser94 commented on issue #6514: Support Conditional Transaction Commits

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1451250830

   Hi @stevenzwu, 
   
   Great job presenting at subsurface conference today :) 
   Regarding your comments: 
   
   > Regarding the Kafka data file committer use case, it is non-desirable to have all the parallel threads committing to the Iceberg table. If the parallelism is 100 or 1,000, there will be a lot of collisions and retries. The conditional commit can ensure the correctness. But it can be inefficient or infeasible with high parallelism. Flink Iceberg sink coalescing all data files to a single committer task so that there is only one committer thread (in a Flink job) committing to the iceberg table.
   
   I fully agree with you on all the points you've stated here. 
   If the parallelism is high, conditional commits will be extremely inefficient to the point of infeasible even though it guarantees "correctness."
   In general, it is best to have just a single thread committing updates to an iceberg table, which is what Flink Iceberg Sink does.
   This is also exactly what we try to do in our custom system. 
   Our issue however is that we cannot guarantee that there will **always** be exactly a single thread committing a given datafile to the iceberg table. 
   In **_rare_** situations, and for only a **_brief_** moment in time, there may be more than one thread attempting to commit **_the same set of datafiles_** to the same iceberg table **_at the same time_**. 
   Due to the distributed nature of our datafile-committer application, it is impossible to completely avoid this type of situation ever happening, we can only make them rare. 
   This is where we want to use conditional commits as a last line of defence to ensure that only one of these threads is successful in committing the datafile during these rare and brief moments of instability. 
   
   I hope this explanation clarifies why conditional commits is still useful for datafile-committer use-cases like ours where you can guarantee that the majority of the time there will not be any conflicts. 
   
   > if we go with the conditional commit approach and fail the second commit with lower watermark, how would the second application handle the failure? we can also get into the situation where the second application may never able to commit. if it's watermark is forever behind the first application, the condition check will always be false. not sure if this is a desirable behavior.
   
   The desired behaviour here will likely vary depending on the use-case. 
   For example, for our use-case, the second application which is hit with a failure would abandon trying to commit that set of datafiles and move on to the next set of datafiles. 
   Due to the nature of our application, we're guaranteed to eventually (and quickly) converge to a stable condition where each instance will be left handling messages that cannot conflict. 
   
   As far as the iceberg library is concerned, IMO it should just provide the fundamental building block i.e. a way for iceberg-api users to express the conditions under which a commit should proceed, and leave how to handle failures up to the API users. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #6514: Support Conditional Transaction Commits

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #6514:
URL: https://github.com/apache/iceberg/issues/6514#issuecomment-1444388008

   Just to add to @rdblue 's good points above. 
   
   Regarding the watermark use cases, we can prefix the watermark (in snapshot metadata) from each writer job to avoid the conflict. The downside is that consumer from the watermark info need to aggregate and take the min value. if we go with the conditional commit approach and fail the second commit with lower watermark, how would the second application handle the failure? we can also get into the situation where the second application may never able to commit if it's watermark is forever behind the first application. The condition check will always be false.
   
   Regarding the Kafka data file committer use case, it is non-desirable to have all the parallel threads committing to the Iceberg table. If the parallelism is 100 or 1,000, there will be a lot of collisions and retries. The conditional commit can ensure the correctness. But it can be inefficient or infeasible with high parallelism. Flink Iceberg sink coalescing all data files to a single committer task so that there is only one committer thread (in a Flink job) committing to the iceberg table.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org