You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/14 05:09:45 UTC
[pinot] branch master updated: Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 35d7a8faa5 Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)
35d7a8faa5 is described below
commit 35d7a8faa5e969602a637a68f158c51fdc594c38
Author: Shen Yu <sh...@startree.ai>
AuthorDate: Thu Jul 13 22:09:39 2023 -0700
Fix a flaky test LLCRealtimeClusterIntegrationTest.testAddRemoveDicionaryAndInvertedIndex. (#11096)
---
.../tests/ClusterIntegrationTestUtils.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index aeccabb55d..457b3d2971 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -352,18 +352,18 @@ public class ClusterIntegrationTestUtils {
public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
throws Exception {
-
+ long counter = 0;
if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
- producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+ producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
for (CSVRecord csv : parser) {
- byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+ byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
: csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
List<String> cols = new ArrayList<>();
for (String col : csv) {
@@ -386,19 +386,19 @@ public class ClusterIntegrationTestUtils {
public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
throws Exception {
-
+ long counter = 0;
if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
- producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+ producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
for (String recordCsv: csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
- byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+ byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
: csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
List<String> cols = new ArrayList<>();
for (String col : csv) {
@@ -435,12 +435,13 @@ public class ClusterIntegrationTestUtils {
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
+ long counter = 0;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
- producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+ producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
}
}
for (File avroFile : avroFiles) {
@@ -455,7 +456,7 @@ public class ClusterIntegrationTestUtils {
datumWriter.write(genericRecord, binaryEncoder);
binaryEncoder.flush();
- byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+ byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
: (genericRecord.get(partitionColumn)).toString().getBytes();
byte[] bytes = outputStream.toByteArray();
producer.produce(kafkaTopic, keyBytes, bytes);
@@ -491,6 +492,7 @@ public class ClusterIntegrationTestUtils {
// initiate transaction.
producer.initTransactions();
producer.beginTransaction();
+ long counter = 0;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
for (File avroFile : avroFiles) {
try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
@@ -504,7 +506,7 @@ public class ClusterIntegrationTestUtils {
datumWriter.write(genericRecord, binaryEncoder);
binaryEncoder.flush();
- byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+ byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
: (genericRecord.get(partitionColumn)).toString().getBytes();
byte[] bytes = outputStream.toByteArray();
ProducerRecord<byte[], byte[]> record = new ProducerRecord(kafkaTopic, keyBytes, bytes);
@@ -543,6 +545,7 @@ public class ClusterIntegrationTestUtils {
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+ long counter = 0;
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
@@ -562,7 +565,7 @@ public class ClusterIntegrationTestUtils {
datumWriter.write(genericRecord, binaryEncoder);
binaryEncoder.flush();
- byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(System.currentTimeMillis())
+ byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter++)
: (genericRecord.get(partitionColumn)).toString().getBytes();
byte[] bytes = outputStream.toByteArray();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org