You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/17 20:57:01 UTC
[GitHub] [kafka] ijuma opened a new pull request, #13012: KAFKA-14477: Move LogValidator and related to storage module
ijuma opened a new pull request, #13012:
URL: https://github.com/apache/kafka/pull/13012
Follows #13010 and hence why it's still a draft.
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822697
##########
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##########
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.common.utils;
-package kafka.common
+import org.junit.jupiter.api.Test;
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
- val recordErrors: Seq[RecordError])
- extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+ @Test
+ public void testLongRef() {
Review Comment:
`IntRef` was there before, but yes, I can take the chance to improve coverage.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
- interBrokerProtocolVersion,
- brokerTopicStats,
+ interBrokerProtocolVersion
+ )
+ validator.validateMessagesAndAssignOffsets(offset,
+ validatorMetricsRecorder(brokerTopicStats.allTopicsStats),
Review Comment:
Good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822187
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
+
+ // Visible for benchmarking
+ def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+ new LogValidator.MetricsRecorder {
+ def recordInvalidMagic(): Unit =
+ allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+ def recordInvalidOffset(): Unit =
Review Comment:
It seemed a bit arbitrary that we combined these into the same metric. I thought it may be cleaner to define the interface in a general way and then have the implementation match the current behavior. I don't feel too strongly though, so if you still prefer to combine these into the same method, I can do it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054846959
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
+
+ // Visible for benchmarking
+ def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+ new LogValidator.MetricsRecorder {
+ def recordInvalidMagic(): Unit =
+ allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+ def recordInvalidOffset(): Unit =
Review Comment:
Sounds good. We can keep this as it is then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362078506
@junrao Pushed a change that addresses two of the review comments. I left a comment for the other review comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1361291206
JDK 17 build passed, the other failures are flaky. Re-running the test suite just in case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma merged PR #13012:
URL: https://github.com/apache/kafka/pull/13012
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054705599
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
}
+
+ // Visible for benchmarking
+ def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+ new LogValidator.MetricsRecorder {
+ def recordInvalidMagic(): Unit =
+ allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+ def recordInvalidOffset(): Unit =
Review Comment:
recordInvalidOffset() and recordInvalidSequence() do the same thing. Should we just have a single method?
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
- interBrokerProtocolVersion,
- brokerTopicStats,
+ interBrokerProtocolVersion
+ )
+ validator.validateMessagesAndAssignOffsets(offset,
+ validatorMetricsRecorder(brokerTopicStats.allTopicsStats),
Review Comment:
Could we just create a single instance of MetricsRecorder and reuse it for all appends?
##########
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##########
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.common.utils;
-package kafka.common
+import org.junit.jupiter.api.Test;
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
- val recordErrors: Seq[RecordError])
- extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+ @Test
+ public void testLongRef() {
Review Comment:
Should we add a similar test for IntRef() too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1360836868
cc @satishd
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module
Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362265880
JDK 17 build passed, other build failures are unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org