You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mehbey (via GitHub)" <gi...@apache.org> on 2023/05/12 00:33:45 UTC

[GitHub] [kafka] mehbey opened a new pull request, #13709: Added log timestamp validation to prevent records with timestamps tha…

mehbey opened a new pull request, #13709:
URL: https://github.com/apache/kafka/pull/13709

   ### What changed
   Added a validation to check if the record timestamp is in the future compared to the broker's timestamp and throw an exception to reject the record.
   
   The current validation for  checking the record's timestamp based on the configuered ```timestampDiffMaxMs``` remain unchanged. This new validation will take care of scenarios where producers are sending future timestamp for a record.
   Specific changes are:
   - Updated validation logic in LogValidator
   - Added Unit test coverage for the change
   - Update Unit tests that failed because of the new validation logic
   
   ### Why?
   https://issues.apache.org/jira/browse/KAFKA-14991
   Improves the accuracy of the log validation logic and avoids unexpected gotchas for customers
   
   
   ### Testing
   - Added relevant unit tests
   - Reproduced the issue by setting nonseconds instead of miliseconds in the producer logic and verified that validation is working as expected. Example API response
   
   ```
   "responses":[{"name":"myTopic1","partitionResponses":[{"index":0,"errorCode":32,"baseOffset":-1,"logAppendTimeMs":-1,"logStartOffset":0,"recordErrors":[
   {"batchIndex":0,"batchIndexErrorMessage":"Timestamp 1755933141855875 of message with offset 0 is ahead of the server's time. 1683838582815"},
   {"batchIndex":1,"batchIndexErrorMessage":"Timestamp 1755933754530625 of message with offset 1 is ahead of the server's time. 1683838582815"},
   ....
   ],"errorMessage":"One or more records have been rejected due to invalid timestamp"}]}]
   ``` 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13709: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1545953842

   Rather than hard-code the allowable skew, should it be a configuration option that defaults to 24 hours?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13709: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1545649144

   @mehbey , thanks for the PR. I think the motivation of this change makes sense. But since it makes the validator more strict than before, I'm thinking we might need to propose a small KIP for this change. After all, after this change, some messages might not be able to sent to broker anymore, which will break existing behavior.
   
   Let me know if you need any help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13709: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13709:
URL: https://github.com/apache/kafka/pull/13709#discussion_r1192539289


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -590,18 +593,25 @@ private static Optional<ApiRecordError> validateTimestamp(RecordBatch batch,
                                                               TimestampType timestampType,
                                                               long timestampDiffMaxMs) {
         if (timestampType == TimestampType.CREATE_TIME
-                && record.timestamp() != RecordBatch.NO_TIMESTAMP
-                && Math.abs(record.timestamp() - now) > timestampDiffMaxMs)
-            return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
-                "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
-                + " is out of range. The timestamp should be within [" + (now - timestampDiffMaxMs)
-                + ", " + (now + timestampDiffMaxMs) + "]")));
+                && record.timestamp() != RecordBatch.NO_TIMESTAMP){
+            final long timestampDiff = record.timestamp() - now;
+            if(timestampDiff > TIME_DRIFT_TOLERANCE){
+                return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
+                        "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+                                + " is ahead of the server's time. " + now)));

Review Comment:
   nit: Can we change this:
   
   ```scala
   + " is ahead of the server's time. " + now)));
   ```
   
   to
   
   ```scala
   + " is ahead of the server's time (" + now + ").")));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mehbey commented on a diff in pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "mehbey (via GitHub)" <gi...@apache.org>.
mehbey commented on code in PR #13709:
URL: https://github.com/apache/kafka/pull/13709#discussion_r1192680413


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -590,18 +593,25 @@ private static Optional<ApiRecordError> validateTimestamp(RecordBatch batch,
                                                               TimestampType timestampType,
                                                               long timestampDiffMaxMs) {
         if (timestampType == TimestampType.CREATE_TIME
-                && record.timestamp() != RecordBatch.NO_TIMESTAMP
-                && Math.abs(record.timestamp() - now) > timestampDiffMaxMs)
-            return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
-                "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
-                + " is out of range. The timestamp should be within [" + (now - timestampDiffMaxMs)
-                + ", " + (now + timestampDiffMaxMs) + "]")));
+                && record.timestamp() != RecordBatch.NO_TIMESTAMP){
+            final long timestampDiff = record.timestamp() - now;
+            if(timestampDiff > TIME_DRIFT_TOLERANCE){
+                return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
+                        "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+                                + " is ahead of the server's time. " + now)));

Review Comment:
   updated - thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13709: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1545742076

   On a separate note, if you prefix the PR name with the JIRA ticket, in this case KAFKA-14991 (i.e. KAFKA-14991: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp) this would automatically link the PR with the JIRA ticket for visibility. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13709: Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1545738266

   I second Luke on this. I also have a slight preference to explore if we can make the check determine whether the timestamp is in nano- or milliseconds or change the client to explicitly send the time unit as well. If this proves undoable/unreasonable I would like to discuss whether we should surface the time period as a configurable property.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mehbey closed pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "mehbey (via GitHub)" <gi...@apache.org>.
mehbey closed pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp
URL: https://github.com/apache/kafka/pull/13709


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mehbey commented on pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "mehbey (via GitHub)" <gi...@apache.org>.
mehbey commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1546133742

   @showuon and @clolov - Thank you for your suggestion to create a small KIP. I will update this thread once the KIP is ready.
   
   @kirktrue, I agree that we can make the allowable skew a configuration option with a default value of one hour. In the KIP, I will discuss the merits of this approach and explore any use cases that may require changing the default value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mehbey commented on pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "mehbey (via GitHub)" <gi...@apache.org>.
mehbey commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1581221839

   Hey @showuon , the [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation#KIP937) for this pull request (PR) is published, and there have been a few discussions. I wanted to notify you and see if you have any comments, as you originally brought up the idea of writing the KIP. Please take a look whenever you have the time, in case you haven't already seen it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mehbey commented on pull request #13709: KAFKA-14991:Added a validation to check if the record timestamp is in the future compared to the broker's timestamp

Posted by "mehbey (via GitHub)" <gi...@apache.org>.
mehbey commented on PR #13709:
URL: https://github.com/apache/kafka/pull/13709#issuecomment-1648169813

   Based on the the proposed implementation in the[ KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation) closing this pull requests as we are no longer perusing the change proposed in this PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org