You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/17 20:57:01 UTC

[GitHub] [kafka] ijuma opened a new pull request, #13012: KAFKA-14477: Move LogValidator and related to storage module

ijuma opened a new pull request, #13012:
URL: https://github.com/apache/kafka/pull/13012

   Follows #13010 and hence why it's still a draft.
   
   For broader context on this change, please check:
   
   * KAFKA-14470: Move log layer to storage module
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822697


##########
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##########
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.utils;
 
-package kafka.common
+import org.junit.jupiter.api.Test;
 
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
-                                val recordErrors: Seq[RecordError])
-  extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+    @Test
+    public void testLongRef() {

Review Comment:
   `IntRef` was there before, but yes, I can take the chance to improve coverage.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                 config.messageTimestampDifferenceMaxMs,
                 leaderEpoch,
                 origin,
-                interBrokerProtocolVersion,
-                brokerTopicStats,
+                interBrokerProtocolVersion
+              )
+              validator.validateMessagesAndAssignOffsets(offset,
+                validatorMetricsRecorder(brokerTopicStats.allTopicsStats),

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822187


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
     LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+    new LogValidator.MetricsRecorder {
+      def recordInvalidMagic(): Unit =
+        allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+      def recordInvalidOffset(): Unit =

Review Comment:
   It seemed a bit arbitrary that we combined these into the same metric. I thought it may be cleaner to define the interface in a general way and then have the implementation match the current behavior. I don't feel too strongly though, so if you still prefer to combine these into the same method, I can do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054846959


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
     LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+    new LogValidator.MetricsRecorder {
+      def recordInvalidMagic(): Unit =
+        allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+      def recordInvalidOffset(): Unit =

Review Comment:
   Sounds good. We can keep this as it is then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362078506

   @junrao Pushed a change that addresses two of the review comments. I left a comment for the other review comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1361291206

   JDK 17 build passed, the other failures are flaky. Re-running the test suite just in case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma merged pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma merged PR #13012:
URL: https://github.com/apache/kafka/pull/13012


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054705599


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
     LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+    new LogValidator.MetricsRecorder {
+      def recordInvalidMagic(): Unit =
+        allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+      def recordInvalidOffset(): Unit =

Review Comment:
   recordInvalidOffset() and recordInvalidSequence() do the same thing. Should we just have a single method?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                 config.messageTimestampDifferenceMaxMs,
                 leaderEpoch,
                 origin,
-                interBrokerProtocolVersion,
-                brokerTopicStats,
+                interBrokerProtocolVersion
+              )
+              validator.validateMessagesAndAssignOffsets(offset,
+                validatorMetricsRecorder(brokerTopicStats.allTopicsStats),

Review Comment:
   Could we just create a single instance of MetricsRecorder and reuse it for all appends?



##########
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##########
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.utils;
 
-package kafka.common
+import org.junit.jupiter.api.Test;
 
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
-                                val recordErrors: Seq[RecordError])
-  extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+    @Test
+    public void testLongRef() {

Review Comment:
   Should we add a similar test for IntRef() too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1360836868

   cc @satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362265880

   JDK 17 build passed, other build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org