You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/21 19:01:14 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054705599


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
     LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
+    new LogValidator.MetricsRecorder {
+      def recordInvalidMagic(): Unit =
+        allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+      def recordInvalidOffset(): Unit =

Review Comment:
   recordInvalidOffset() and recordInvalidSequence() do the same thing. Should we just have a single method?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                 config.messageTimestampDifferenceMaxMs,
                 leaderEpoch,
                 origin,
-                interBrokerProtocolVersion,
-                brokerTopicStats,
+                interBrokerProtocolVersion
+              )
+              validator.validateMessagesAndAssignOffsets(offset,
+                validatorMetricsRecorder(brokerTopicStats.allTopicsStats),

Review Comment:
   Could we just create a single instance of MetricsRecorder and reuse it for all appends?



##########
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##########
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.utils;
 
-package kafka.common
+import org.junit.jupiter.api.Test;
 
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
-                                val recordErrors: Seq[RecordError])
-  extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+    @Test
+    public void testLongRef() {

Review Comment:
   Should we add a similar test for IntRef() too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org