You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/14 05:09:45 UTC

[pinot] branch master updated: Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d7a8faa5 Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)
35d7a8faa5 is described below

commit 35d7a8faa5e969602a637a68f158c51fdc594c38
Author: Shen Yu <sh...@startree.ai>
AuthorDate: Thu Jul 13 22:09:39 2023 -0700

    Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)
---
 .../tests/ClusterIntegrationTestUtils.java         | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index aeccabb55d..457b3d2971 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -352,18 +352,18 @@ public class ClusterIntegrationTestUtils {
   public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
       @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
       throws Exception {
-
+    long counter = 0;
     if (injectTombstones) {
       // publish lots of tombstones to livelock the consumer if it can't handle this properly
       for (int i = 0; i < 1000; i++) {
         // publish a tombstone first
-        producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+        producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
       }
     }
     CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
     try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
       for (CSVRecord csv : parser) {
-        byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+        byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
             : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
         List<String> cols = new ArrayList<>();
         for (String col : csv) {
@@ -386,19 +386,19 @@ public class ClusterIntegrationTestUtils {
   public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
       @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
       throws Exception {
-
+    long counter = 0;
     if (injectTombstones) {
       // publish lots of tombstones to livelock the consumer if it can't handle this properly
       for (int i = 0; i < 1000; i++) {
         // publish a tombstone first
-        producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+        producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
       }
     }
     CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
     for (String recordCsv: csvRecords) {
       try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
         for (CSVRecord csv : parser) {
-          byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+          byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
               : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
           List<String> cols = new ArrayList<>();
           for (String col : csv) {
@@ -435,12 +435,13 @@ public class ClusterIntegrationTestUtils {
     StreamDataProducer producer =
         StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
 
+    long counter = 0;
     try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
       if (injectTombstones) {
         // publish lots of tombstones to livelock the consumer if it can't handle this properly
         for (int i = 0; i < 1000; i++) {
           // publish a tombstone first
-          producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+          producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
         }
       }
       for (File avroFile : avroFiles) {
@@ -455,7 +456,7 @@ public class ClusterIntegrationTestUtils {
             datumWriter.write(genericRecord, binaryEncoder);
             binaryEncoder.flush();
 
-            byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+            byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
                 : (genericRecord.get(partitionColumn)).toString().getBytes();
             byte[] bytes = outputStream.toByteArray();
             producer.produce(kafkaTopic, keyBytes, bytes);
@@ -491,6 +492,7 @@ public class ClusterIntegrationTestUtils {
     // initiate transaction.
     producer.initTransactions();
     producer.beginTransaction();
+    long counter = 0;
     try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
       for (File avroFile : avroFiles) {
         try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
@@ -504,7 +506,7 @@ public class ClusterIntegrationTestUtils {
             datumWriter.write(genericRecord, binaryEncoder);
             binaryEncoder.flush();
 
-            byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+            byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
                 : (genericRecord.get(partitionColumn)).toString().getBytes();
             byte[] bytes = outputStream.toByteArray();
             ProducerRecord<byte[], byte[]> record = new ProducerRecord(kafkaTopic, keyBytes, bytes);
@@ -543,6 +545,7 @@ public class ClusterIntegrationTestUtils {
     properties.put("request.required.acks", "1");
     properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
 
+    long counter = 0;
     StreamDataProducer producer =
         StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
     try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
@@ -562,7 +565,7 @@ public class ClusterIntegrationTestUtils {
           datumWriter.write(genericRecord, binaryEncoder);
           binaryEncoder.flush();
 
-          byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+          byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
               : (genericRecord.get(partitionColumn)).toString().getBytes();
           byte[] bytes = outputStream.toByteArray();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org