You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:07:47 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9516: fix spammy logs for ConfluentSchemaRegistryRealtimeClusterIntegrationTest [MINOR]

walterddr commented on code in PR #9516:
URL: https://github.com/apache/pinot/pull/9516#discussion_r985994470


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:
##########
@@ -129,29 +130,39 @@ protected void pushAvroIntoKafka(List<File> avroFiles)
         "io.confluent.kafka.serializers.KafkaAvroSerializer");
     Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps);
 
+    // this producer produces intentionally malformatted records so that
+    // we can test the behavior when consuming such records
     Properties nonAvroProducerProps = new Properties();
     nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort());
     nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
     nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
-    Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps);
+    Producer<byte[], byte[]> invalidDataProducer = new KafkaProducer<>(nonAvroProducerProps);
 
     if (injectTombstones()) {
       // publish lots of tombstones to livelock the consumer if it can't handle this properly
       for (int i = 0; i < 1000; i++) {
         // publish a tombstone first
-        nonAvroProducer.send(
+        avroProducer.send(
             new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null));
       }
     }
+
     for (File avroFile : avroFiles) {
+      int numInvalidRecords = 0;
       try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
         for (GenericRecord genericRecord : reader) {
           byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis())
               : (genericRecord.get(getPartitionColumn())).toString().getBytes();
-          // Ignore getKafkaMessageHeader()
-          nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8)));
+
+          if (numInvalidRecords < NUM_INVALID_RECORDS) {
+            // send a few rubbish records to validate that the consumer will skip over non-avro records, but
+            // don't spam them every time as it causes log spam

Review Comment:
   do we still need to change the test behavior since you already suppressed them in the log4j2.xml config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org