You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/15 03:26:48 UTC
[incubator-pinot] branch master updated: Clean up all integration
tests and standardize the creation of schema,
table config and segments (#5385)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 46e2701 Clean up all integration tests and standardize the creation of schema, table config and segments (#5385)
46e2701 is described below
commit 46e2701230530cd93491c53a6f83caad09bbc97c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu May 14 20:26:38 2020 -0700
Clean up all integration tests and standardize the creation of schema, table config and segments (#5385)
- Create schema and table config based on the settings in the BaseClusterIntegrationTest
- Remove the methods with huge arguments list
- Always use the schema and table config in the cluster to create the segments
- Simplify the cluster setup methods
The following integration tests are removed:
- DefaultCommitterRealtimeIntegrationTest: Merged into LLCRealtimeClusterIntegrationTest
- PinotURIUploadIntegrationTest: Already covered in the module-level FileUploadDownloadClientTest
ControllerPeriodicTasksIntegrationTest is refactored (was not running before because the wrong test name '*Tests')
RealtimeStressTest is removed as it is the same as BenchmarkRealtimeConsumptionSpeed
---
.../common/utils/FileUploadDownloadClient.java | 10 +-
.../pinot/controller/helix/ControllerTest.java | 58 +-
.../generator/SegmentGeneratorConfig.java | 5 +
.../tests/BaseClusterIntegrationTest.java | 305 ++++++-----
.../tests/BaseClusterIntegrationTestSet.java | 6 +-
.../tests/ClusterIntegrationTestUtils.java | 151 +++---
.../pinot/integration/tests/ClusterTest.java | 252 +--------
.../ControllerPeriodicTasksIntegrationTest.java | 379 +++++++++++++
.../ControllerPeriodicTasksIntegrationTests.java | 586 ---------------------
...vertToRawIndexMinionClusterIntegrationTest.java | 9 +-
.../DefaultCommitterRealtimeIntegrationTest.java | 202 -------
...lakyConsumerRealtimeClusterIntegrationTest.java | 8 -
.../tests/HybridClusterIntegrationTest.java | 86 ++-
...ridClusterIntegrationTestCommandLineRunner.java | 138 +++--
.../tests/JsonPathClusterIntegrationTest.java | 172 ++----
.../tests/LLCRealtimeClusterIntegrationTest.java | 63 +--
.../LuceneRealtimeClusterIntegrationTest.java | 230 ++++----
.../tests/MapTypeClusterIntegrationTest.java | 56 +-
...onaryAggregationPlanClusterIntegrationTest.java | 411 +++++----------
.../tests/OfflineClusterIntegrationTest.java | 179 +++----
.../tests/PinotURIUploadIntegrationTest.java | 273 ----------
.../tests/RealtimeClusterIntegrationTest.java | 25 +-
.../tests/SegmentCompletionIntegrationTest.java | 15 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 12 +-
.../tests/StarTreeClusterIntegrationTest.java | 123 ++---
.../tests/UploadRefreshDeleteIntegrationTest.java | 296 -----------
.../apache/pinot/server/util/SegmentTestUtils.java | 68 ---
pinot-perf/pom.xml | 5 -
.../perf/BenchmarkRealtimeConsumptionSpeed.java | 69 +--
.../org/apache/pinot/perf/RealtimeStressTest.java | 129 -----
30 files changed, 1402 insertions(+), 2919 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 9172fa3..4e9a303 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -497,20 +498,19 @@ public class FileUploadDownloadClient implements Closeable {
* @param uri URI
* @param segmentName Segment name
* @param segmentFile Segment file
- * @param rawTableName Raw table name
+ * @param tableName Table name with or without type suffix
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
- public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String rawTableName)
+ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName)
throws IOException, HttpErrorStatusException {
// Add table name as a request parameter
- NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName);
- List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+ NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
+ List<NameValuePair> parameters = Collections.singletonList(tableNameValuePair);
return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
}
-
/**
* Upload segment with segment file input stream.
*
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e02ff42..89c365d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -65,12 +65,15 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -81,6 +84,8 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLE
import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
/**
@@ -407,7 +412,58 @@ public abstract class ControllerTest {
throws IOException {
String url = _controllerRequestURLBuilder.forSchemaCreate();
PostMethod postMethod = sendMultipartPostRequest(url, schema.toSingleLineJsonString());
- Assert.assertEquals(postMethod.getStatusCode(), 200);
+ assertEquals(postMethod.getStatusCode(), 200);
+ }
+
+ protected Schema getSchema(String schemaName) {
+ Schema schema = _helixResourceManager.getSchema(schemaName);
+ assertNotNull(schema);
+ return schema;
+ }
+
+ protected void addTableConfig(TableConfig tableConfig)
+ throws IOException {
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+ }
+
+ protected void updateTableConfig(TableConfig tableConfig)
+ throws IOException {
+ sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableConfig.getTableName()),
+ tableConfig.toJsonString());
+ }
+
+ protected TableConfig getOfflineTableConfig(String tableName) {
+ TableConfig offlineTableConfig = _helixResourceManager.getOfflineTableConfig(tableName);
+ Assert.assertNotNull(offlineTableConfig);
+ return offlineTableConfig;
+ }
+
+ protected TableConfig getRealtimeTableConfig(String tableName) {
+ TableConfig realtimeTableConfig = _helixResourceManager.getRealtimeTableConfig(tableName);
+ Assert.assertNotNull(realtimeTableConfig);
+ return realtimeTableConfig;
+ }
+
+ protected void dropOfflineTable(String tableName)
+ throws IOException {
+ sendDeleteRequest(
+ _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
+ }
+
+ protected void dropRealtimeTable(String tableName)
+ throws IOException {
+ sendDeleteRequest(
+ _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
+ }
+
+ protected void reloadOfflineTable(String tableName)
+ throws IOException {
+ sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE.name()), null);
+ }
+
+ protected void reloadRealtimeTable(String tableName)
+ throws IOException {
+ sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.REALTIME.name()), null);
}
protected String getBrokerTenantRequestPayload(String tenantName, int numBrokers) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 8fa4835..7126ca4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -130,6 +130,11 @@ public class SegmentGeneratorConfig {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig != null) {
+ String segmentVersion = indexingConfig.getSegmentFormatVersion();
+ if (segmentVersion != null) {
+ _segmentVersion = SegmentVersion.valueOf(segmentVersion);
+ }
+
List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
Map<String, String> noDictionaryColumnMap = indexingConfig.getNoDictionaryConfig();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 4e74c42..074d2d3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -20,28 +20,31 @@ package org.apache.pinot.integration.tests;
import com.google.common.base.Function;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -53,29 +56,30 @@ import org.testng.Assert;
public abstract class BaseClusterIntegrationTest extends ClusterTest {
// Default settings
- private static final String DEFAULT_TABLE_NAME = "mytable";
- private static final String DEFAULT_SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
- private static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
- private static final String DEFAULT_AVRO_TAR_FILE_NAME =
+ protected static final String DEFAULT_TABLE_NAME = "mytable";
+ protected static final String DEFAULT_SCHEMA_NAME = "mytable";
+ protected static final String DEFAULT_SCHEMA_FILE_NAME =
+ "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
+ protected static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
+ protected static final String DEFAULT_AVRO_TAR_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz";
- private static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
- private static final int DEFAULT_LLC_SEGMENT_FLUSH_SIZE = 5000;
- private static final int DEFAULT_HLC_SEGMENT_FLUSH_SIZE = 20000;
- private static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
- private static final int DEFAULT_HLC_NUM_KAFKA_BROKERS = 1;
- private static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
- private static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
- private static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
- private static final String DEFAULT_SORTED_COLUMN = "Carrier";
- private static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
- private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
+ protected static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
+ protected static final int DEFAULT_LLC_SEGMENT_FLUSH_SIZE = 5000;
+ protected static final int DEFAULT_HLC_SEGMENT_FLUSH_SIZE = 20000;
+ protected static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
+ protected static final int DEFAULT_HLC_NUM_KAFKA_BROKERS = 1;
+ protected static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
+ protected static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
+ protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
+ protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+ protected static final String DEFAULT_SORTED_COLUMN = "Carrier";
+ protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
- private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Arrays.asList("", "Origin");
+ private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin");
+ protected static final int DEFAULT_NUM_REPLICAS = 1;
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
- protected final File _avroDir = new File(_tempDir, "avroDir");
- protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<StreamDataServerStartable> _kafkaStarters;
@@ -87,14 +91,20 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
/**
* The following getters can be overridden to change default settings.
*/
+
protected String getTableName() {
return DEFAULT_TABLE_NAME;
}
+ protected String getSchemaName() {
+ return DEFAULT_SCHEMA_NAME;
+ }
+
protected String getSchemaFileName() {
return DEFAULT_SCHEMA_FILE_NAME;
}
+ @Nullable
protected String getTimeColumnName() {
return DEFAULT_TIME_COLUMN_NAME;
}
@@ -107,10 +117,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
return DEFAULT_COUNT_STAR_RESULT;
}
- protected String getKafkaTopic() {
- return getClass().getSimpleName();
- }
-
protected boolean useLlc() {
return false;
}
@@ -135,6 +141,14 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
}
+ protected int getBaseKafkaPort() {
+ return KafkaStarterUtils.DEFAULT_KAFKA_PORT;
+ }
+
+ protected String getKafkaZKAddress() {
+ return KafkaStarterUtils.DEFAULT_ZK_STR;
+ }
+
protected int getNumKafkaPartitions() {
if (useLlc()) {
return DEFAULT_LLC_NUM_KAFKA_PARTITIONS;
@@ -143,6 +157,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
}
+ protected String getKafkaTopic() {
+ return getClass().getSimpleName();
+ }
+
protected int getMaxNumKafkaMessagesPerBatch() {
return DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH;
}
@@ -168,18 +186,34 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
@Nullable
- protected List<String> getBloomFilterIndexColumns() {
- return DEFAULT_BLOOM_FILTER_COLUMNS;
+ protected List<String> getNoDictionaryColumns() {
+ return DEFAULT_NO_DICTIONARY_COLUMNS;
}
@Nullable
protected List<String> getRangeIndexColumns() {
- return DEFAULT_RANGE_INDEX_COLUMNS;
+ return null;
+ // TODO: Fix the JVM crash then uncomment this line
+// return DEFAULT_RANGE_INDEX_COLUMNS;
+ }
+
+ @Nullable
+ protected List<String> getBloomFilterColumns() {
+ return DEFAULT_BLOOM_FILTER_COLUMNS;
}
@Nullable
- protected List<String> getRawIndexColumns() {
- return DEFAULT_RAW_INDEX_COLUMNS;
+ protected List<FieldConfig> getFieldConfigs() {
+ return null;
+ }
+
+ protected int getNumReplicas() {
+ return DEFAULT_NUM_REPLICAS;
+ }
+
+ @Nullable
+ protected String getSegmentVersion() {
+ return null;
}
@Nullable
@@ -193,23 +227,114 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
@Nullable
- protected String getServerTenant() {
+ protected String getBrokerTenant() {
return TagNameUtils.DEFAULT_TENANT_NAME;
}
@Nullable
- protected String getBrokerTenant() {
+ protected String getServerTenant() {
return TagNameUtils.DEFAULT_TENANT_NAME;
}
- protected SegmentPartitionConfig getSegmentPartitionConfig() {
+ /**
+ * The following methods are based on the getters. Override the getters for non-default settings before calling these
+ * methods.
+ */
+
+ /**
+ * Creates a new schema.
+ */
+ protected Schema createSchema()
+ throws IOException {
+ URL resourceUrl = BaseClusterIntegrationTest.class.getClassLoader().getResource(getSchemaFileName());
+ Assert.assertNotNull(resourceUrl);
+ return Schema.fromFile(new File(resourceUrl.getFile()));
+ }
+
+ /**
+ * Returns the schema in the cluster.
+ */
+ protected Schema getSchema() {
+ return getSchema(getSchemaName());
+ }
+
+ /**
+ * Creates a new OFFLINE table config.
+ */
+ protected TableConfig createOfflineTableConfig() {
+ return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName())
+ .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+ .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+ .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+ .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+ .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+ .setServerTenant(getServerTenant()).build();
+ }
- ColumnPartitionConfig columnPartitionConfig = new ColumnPartitionConfig("murmur", 2);
- Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
- columnPartitionConfigMap.put("AirlineID", columnPartitionConfig);
+ /**
+ * Returns the OFFLINE table config in the cluster.
+ */
+ protected TableConfig getOfflineTableConfig() {
+ return getOfflineTableConfig(getTableName());
+ }
+
+ /**
+ * Creates a new REALTIME table config.
+ */
+ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ Map<String, String> streamConfigs = new HashMap<>();
+ String streamType = "kafka";
+ streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ boolean useLlc = useLlc();
+ if (useLlc) {
+ // LLC
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.LOWLEVEL.toString());
+ streamConfigs.put(KafkaStreamConfigProperties
+ .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST),
+ KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+ } else {
+ // HLC
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.HIGHLEVEL.toString());
+ streamConfigs.put(KafkaStreamConfigProperties
+ .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
+ KafkaStarterUtils.DEFAULT_ZK_STR);
+ streamConfigs.put(KafkaStreamConfigProperties
+ .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER),
+ KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+ }
+ streamConfigs.put(StreamConfigProperties
+ .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+ getStreamConsumerFactoryClassName());
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+ getKafkaTopic());
+ AvroFileSchemaKafkaAvroMessageDecoder.avroFile = sampleAvroFile;
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ AvroFileSchemaKafkaAvroMessageDecoder.class.getName());
+ streamConfigs
+ .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(getRealtimeSegmentFlushSize()));
+ streamConfigs.put(StreamConfigProperties
+ .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
+ .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+ .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+ .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+ .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+ .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+ .setServerTenant(getServerTenant()).setLLC(useLlc).setStreamConfigs(streamConfigs).build();
+ }
- SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(columnPartitionConfigMap);
- return segmentPartitionConfig;
+ /**
+ * Returns the REALTIME table config in the cluster.
+ */
+ protected TableConfig getRealtimeTableConfig() {
+ return getRealtimeTableConfig(getTableName());
}
/**
@@ -245,55 +370,23 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
/**
- * Set up H2 connection to a table with pre-loaded data.
- *
- * @param avroFiles List of Avro files to be loaded.
- * @param executor Executor
- * @throws Exception
+ * Sets up the H2 connection to a table with pre-loaded data.
*/
- protected void setUpH2Connection(final List<File> avroFiles, Executor executor)
+ protected void setUpH2Connection(List<File> avroFiles)
throws Exception {
Assert.assertNull(_h2Connection);
Class.forName("org.h2.Driver");
_h2Connection = DriverManager.getConnection("jdbc:h2:mem:");
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getTableName(), _h2Connection);
- } catch (Exception e) {
- // Ignored
- }
- }
- });
+ ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getTableName(), _h2Connection);
}
/**
- * Set up query generator using the given Avro files.
- *
- * @param avroFiles List of Avro files
- * @param executor Executor
+ * Sets up the query generator using the given Avro files.
*/
- protected void setUpQueryGenerator(final List<File> avroFiles, Executor executor) {
+ protected void setUpQueryGenerator(List<File> avroFiles) {
Assert.assertNull(_queryGenerator);
- final String tableName = getTableName();
- executor.execute(new Runnable() {
- @Override
- public void run() {
- _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
- }
- });
- }
-
- /**
- * Get the schema file.
- *
- * @return Schema file
- */
- protected File getSchemaFile() {
- URL resourceUrl = BaseClusterIntegrationTest.class.getClassLoader().getResource(getSchemaFileName());
- Assert.assertNotNull(resourceUrl);
- return new File(resourceUrl.getFile());
+ String tableName = getTableName();
+ _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
}
/**
@@ -311,30 +404,19 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
/**
- * Push the data in the given Avro files into a Kafka stream.
+ * Pushes the data in the given Avro files into a Kafka stream.
*
* @param avroFiles List of Avro files
- * @param kafkaTopic Kafka topic
- * @param executor Executor
*/
- protected void pushAvroIntoKafka(final List<File> avroFiles, final String kafkaTopic, Executor executor) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
- } catch (Exception e) {
- // Ignored
- }
- }
- });
+ protected void pushAvroIntoKafka(List<File> avroFiles)
+ throws Exception {
+ ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getBaseKafkaPort(), getKafkaTopic(),
+ getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
}
protected void startKafka() {
- _kafkaStarters = KafkaStarterUtils
- .startServers(getNumKafkaBrokers(), KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
- KafkaStarterUtils.getDefaultKafkaConfiguration());
+ _kafkaStarters = KafkaStarterUtils.startServers(getNumKafkaBrokers(), getBaseKafkaPort(), getKafkaZKAddress(),
+ KafkaStarterUtils.getDefaultKafkaConfiguration());
_kafkaStarters.get(0)
.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
}
@@ -407,31 +489,4 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
ClusterIntegrationTestUtils
.testSqlQuery(pinotQuery, _brokerBaseApiUrl, getPinotConnection(), sqlQueries, getH2Connection());
}
-
- protected void setUpRealtimeTable(File avroFile, int numReplicas, boolean useLLC, String tableName)
- throws Exception {
- File schemaFile = getSchemaFile();
- Schema schema = Schema.fromFile(schemaFile);
- String schemaName = schema.getSchemaName();
- addSchema(schemaFile, schemaName);
-
- String timeColumnName = getTimeColumnName();
- FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
- Assert.assertNotNull(fieldSpec);
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
- TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
-
- addRealtimeTable(tableName, useLLC, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
- getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
- getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
- getBloomFilterIndexColumns(), getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
- numReplicas);
- }
-
- protected void setUpRealtimeTable(File avroFile)
- throws Exception {
- setUpRealtimeTable(avroFile, 1, useLlc(), getTableName());
- }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 89cda04..da4cf16 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -526,7 +526,7 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
public void testReload(boolean includeOfflineTable)
throws Exception {
String rawTableName = getTableName();
- Schema schema = Schema.fromFile(getSchemaFile());
+ Schema schema = getSchema();
String selectStarQuery = "SELECT * FROM " + rawTableName;
JsonNode queryResponse = postQuery(selectStarQuery);
@@ -554,9 +554,9 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
// Reload the table
if (includeOfflineTable) {
- sendPostRequest(_controllerRequestURLBuilder.forTableReload(rawTableName, "OFFLINE"), null);
+ reloadOfflineTable(rawTableName);
}
- sendPostRequest(_controllerRequestURLBuilder.forTableReload(rawTableName, "REALTIME"), null);
+ reloadRealtimeTable(rawTableName);
// Wait for all segments to finish reloading, and test querying the new columns
// NOTE: Use count query to prevent schema inconsistency error
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 2782e54..64b1c47 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -40,9 +40,9 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
@@ -65,12 +65,12 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.server.util.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.testng.Assert;
@@ -92,8 +92,7 @@ public class ClusterIntegrationTestUtils {
* @throws Exception
*/
@SuppressWarnings("SqlNoDataSourceInspection")
- public static void setUpH2TableWithAvro(@Nonnull List<File> avroFiles, @Nonnull String tableName,
- @Nonnull Connection h2Connection)
+ public static void setUpH2TableWithAvro(List<File> avroFiles, String tableName, Connection h2Connection)
throws Exception {
int numFields;
@@ -196,7 +195,7 @@ public class ClusterIntegrationTestUtils {
* @param avroFieldType Avro field type
* @return Whether the given Avro field type is a single value type (non-NULL)
*/
- private static boolean isSingleValueAvroFieldType(@Nonnull Schema.Type avroFieldType) {
+ private static boolean isSingleValueAvroFieldType(Schema.Type avroFieldType) {
return (avroFieldType == Schema.Type.BOOLEAN) || (avroFieldType == Schema.Type.INT) || (avroFieldType
== Schema.Type.LONG) || (avroFieldType == Schema.Type.FLOAT) || (avroFieldType == Schema.Type.DOUBLE) || (
avroFieldType == Schema.Type.STRING);
@@ -210,9 +209,7 @@ public class ClusterIntegrationTestUtils {
* @param nullable Whether the column is nullable
* @return H2 field name and type
*/
- @Nonnull
- private static String buildH2FieldNameAndType(@Nonnull String fieldName, @Nonnull Schema.Type avroFieldType,
- boolean nullable) {
+ private static String buildH2FieldNameAndType(String fieldName, Schema.Type avroFieldType, boolean nullable) {
String avroFieldTypeName = avroFieldType.getName();
String h2FieldType;
switch (avroFieldTypeName) {
@@ -234,74 +231,68 @@ public class ClusterIntegrationTestUtils {
}
/**
- * Builds segments from the given Avro files. Each segment will be built using a separate thread.
- * @param avroFiles List of Avro files
+ * Builds Pinot segments from the given Avro files. Each segment will be built using a separate thread.
+ *
+ * @param avroFiles List of Avro files
+ * @param tableConfig Pinot table config
+ * @param schema Pinot schema
* @param baseSegmentIndex Base segment index number
* @param segmentDir Output directory for the un-tarred segments
* @param tarDir Output directory for the tarred segments
- * @param tableName Table name
- * @param starTreeV2BuilderConfigs List of star-tree V2 builder configs
- * @param rawIndexColumns Columns to create raw index with
- * @param pinotSchema Pinot schema
- * @param executor Executor
*/
- public static void buildSegmentsFromAvro(List<File> avroFiles, int baseSegmentIndex, File segmentDir, File tarDir,
- String tableName, @Nullable List<StarTreeV2BuilderConfig> starTreeV2BuilderConfigs,
- @Nullable List<String> rawIndexColumns, @Nullable org.apache.pinot.spi.data.Schema pinotSchema,
- Executor executor) {
- int numSegments = avroFiles.size();
- for (int i = 0; i < numSegments; i++) {
- final File avroFile = avroFiles.get(i);
- final int segmentIndex = i + baseSegmentIndex;
- executor.execute(() -> {
- try {
- File outputDir = new File(segmentDir, "segment-" + segmentIndex);
- SegmentGeneratorConfig segmentGeneratorConfig =
- SegmentTestUtils.getSegmentGeneratorConfig(avroFile, outputDir, TimeUnit.DAYS, tableName, pinotSchema);
-
- // Test segment with space and special character in the file name
- segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
-
- if (starTreeV2BuilderConfigs != null) {
- segmentGeneratorConfig.setStarTreeV2BuilderConfigs(starTreeV2BuilderConfigs);
- }
-
- if (rawIndexColumns != null) {
- segmentGeneratorConfig.setRawIndexCreationColumns(rawIndexColumns);
- }
-
- // Build the segment
- SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig);
- driver.build();
-
- // Tar the segment
- File[] files = outputDir.listFiles();
- Assert.assertNotNull(files);
- File segmentFile = files[0];
- String segmentName = segmentFile.getName();
- TarGzCompressionUtils
- .createTarGzOfDirectory(segmentFile.getAbsolutePath(), new File(tarDir, segmentName).getAbsolutePath());
- } catch (Exception e) {
- // Ignored
- }
- });
+ public static void buildSegmentsFromAvro(List<File> avroFiles, TableConfig tableConfig,
+ org.apache.pinot.spi.data.Schema schema, int baseSegmentIndex, File segmentDir, File tarDir)
+ throws Exception {
+ int numAvroFiles = avroFiles.size();
+ if (numAvroFiles == 1) {
+ buildSegmentFromAvro(avroFiles.get(0), tableConfig, schema, baseSegmentIndex, segmentDir, tarDir);
+ } else {
+ ExecutorService executorService = Executors.newFixedThreadPool(numAvroFiles);
+ List<Future<Void>> futures = new ArrayList<>(numAvroFiles);
+ for (int i = 0; i < numAvroFiles; i++) {
+ File avroFile = avroFiles.get(i);
+ int segmentIndex = i + baseSegmentIndex;
+ futures.add(executorService.submit(() -> {
+ buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex, segmentDir, tarDir);
+ return null;
+ }));
+ }
+ executorService.shutdown();
+ for (Future<Void> future : futures) {
+ future.get();
+ }
}
}
/**
- * Builds segments from the given Avro files. Each segment will be built using a separate thread.
+ * Builds one Pinot segment from the given Avro file.
*
- * @param avroFiles List of Avro files
- * @param baseSegmentIndex Base segment index number
+ * @param avroFile Avro file
+ * @param tableConfig Pinot table config
+ * @param schema Pinot schema
+ * @param segmentIndex Segment index number
* @param segmentDir Output directory for the un-tarred segments
* @param tarDir Output directory for the tarred segments
- * @param tableName Table name
- * @param executor Executor
*/
- public static void buildSegmentsFromAvro(List<File> avroFiles, int baseSegmentIndex, File segmentDir, File tarDir,
- String tableName, Executor executor) {
- buildSegmentsFromAvro(avroFiles, baseSegmentIndex, segmentDir, tarDir, tableName, null, null, null, executor);
+ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
+ org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir)
+ throws Exception {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+ segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
+ segmentGeneratorConfig.setOutDir(segmentDir.getPath());
+ segmentGeneratorConfig.setTableName(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
+ // Test segment with space and special character in the file name
+ segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
+
+ // Build the segment
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+
+ // Tar the segment
+ String segmentName = driver.getSegmentName();
+ File indexDir = new File(segmentDir, driver.getSegmentName());
+ TarGzCompressionUtils.createTarGzOfDirectory(indexDir.getPath(), new File(tarDir, segmentName).getPath());
}
/**
@@ -315,9 +306,8 @@ public class ClusterIntegrationTestUtils {
* @param partitionColumn Optional partition column
* @throws Exception
*/
- public static void pushAvroIntoKafka(@Nonnull List<File> avroFiles, @Nonnull String kafkaBroker,
- @Nonnull String kafkaTopic, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
- @Nullable String partitionColumn)
+ public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, String kafkaTopic,
+ int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
@@ -364,8 +354,8 @@ public class ClusterIntegrationTestUtils {
* @throws Exception
*/
@SuppressWarnings("unused")
- public static void pushRandomAvroIntoKafka(@Nonnull File avroFile, @Nonnull String kafkaBroker,
- @Nonnull String kafkaTopic, int numKafkaMessagesToPush, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
+ public static void pushRandomAvroIntoKafka(File avroFile, String kafkaBroker, String kafkaTopic,
+ int numKafkaMessagesToPush, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
@Nullable String partitionColumn)
throws Exception {
Properties properties = new Properties();
@@ -444,8 +434,7 @@ public class ClusterIntegrationTestUtils {
* @param fieldType Field type
* @return Random value for the given field type
*/
- @Nonnull
- private static Object generateRandomValue(@Nonnull Schema.Type fieldType) {
+ private static Object generateRandomValue(Schema.Type fieldType) {
switch (fieldType) {
case BOOLEAN:
return RANDOM.nextBoolean();
@@ -480,8 +469,8 @@ public class ClusterIntegrationTestUtils {
* @param h2Connection H2 connection
* @throws Exception
*/
- public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFormat, @Nonnull String brokerUrl,
- @Nonnull org.apache.pinot.client.Connection pinotConnection, @Nullable List<String> sqlQueries,
+ public static void testQuery(String pinotQuery, String queryFormat, String brokerUrl,
+ org.apache.pinot.client.Connection pinotConnection, @Nullable List<String> sqlQueries,
@Nullable Connection h2Connection)
throws Exception {
// Use broker response for metadata check, connection response for value check
@@ -1000,8 +989,8 @@ public class ClusterIntegrationTestUtils {
* @param failureMessage Failure message
* @param e Exception
*/
- private static void failure(@Nonnull String pqlQuery, @Nullable List<String> sqlQueries,
- @Nonnull String failureMessage, @Nullable Exception e) {
+ private static void failure(String pqlQuery, @Nullable List<String> sqlQueries, String failureMessage,
+ @Nullable Exception e) {
failureMessage += "\nPQL: " + pqlQuery;
if (sqlQueries != null) {
failureMessage += "\nSQL: " + sqlQueries;
@@ -1020,8 +1009,7 @@ public class ClusterIntegrationTestUtils {
* @param sqlQueries H2 SQL queries
* @param failureMessage Failure message
*/
- private static void failure(@Nonnull String pqlQuery, @Nullable List<String> sqlQueries,
- @Nonnull String failureMessage) {
+ private static void failure(String pqlQuery, @Nullable List<String> sqlQueries, String failureMessage) {
failure(pqlQuery, sqlQueries, failureMessage, null);
}
@@ -1034,8 +1022,7 @@ public class ClusterIntegrationTestUtils {
* @param value raw value.
* @return converted value.
*/
- @Nonnull
- private static String convertBooleanToLowerCase(@Nonnull String value) {
+ private static String convertBooleanToLowerCase(String value) {
if (value.equals("TRUE")) {
return "true";
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 1f221ea..d4f7494 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -23,11 +23,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -52,35 +50,23 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.minion.MinionStarter;
import org.apache.pinot.minion.events.MinionEventObserverFactory;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig;
import org.apache.pinot.server.starter.helix.HelixServerStarter;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
/**
@@ -96,17 +82,14 @@ public abstract class ClusterTest extends ControllerTest {
private List<HelixServerStarter> _serverStarters;
private MinionStarter _minionStarter;
- // TODO: clean this up
- private TableConfig _realtimeTableConfig;
-
protected void startBroker()
throws Exception {
startBrokers(1);
}
- protected void startBroker(int basePort, String zkStr)
+ protected void startBroker(int port, String zkStr)
throws Exception {
- startBrokers(1, basePort, zkStr);
+ startBrokers(1, port, zkStr);
}
protected void startBrokers(int numBrokers)
@@ -245,102 +228,42 @@ public abstract class ClusterTest extends ControllerTest {
_minionStarter = null;
}
- protected void addSchema(File schemaFile, String schemaName)
- throws Exception {
- try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- fileUploadDownloadClient
- .addSchema(FileUploadDownloadClient.getUploadSchemaHttpURI(LOCAL_HOST, _controllerPort), schemaName,
- schemaFile);
- }
- }
-
/**
* Upload all segments inside the given directory to the cluster.
*
- * @param segmentDir Segment directory
+ * @param tarDir Segment directory
*/
- protected void uploadSegments(String tableName, File segmentDir)
+ protected void uploadSegments(String tableName, File tarDir)
throws Exception {
- String[] segmentNames = segmentDir.list();
- assertNotNull(segmentNames);
+ File[] segmentTarFiles = tarDir.listFiles();
+ assertNotNull(segmentTarFiles);
+ int numSegments = segmentTarFiles.length;
+ assertTrue(numSegments > 0);
+
+ URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- final URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
-
- // Upload all segments in parallel
- int numSegments = segmentNames.length;
- ExecutorService executor = Executors.newFixedThreadPool(numSegments);
- List<Future<Integer>> tasks = new ArrayList<>(numSegments);
- for (final String segmentName : segmentNames) {
- final File segmentFile = new File(segmentDir, segmentName);
- tasks.add(executor.submit(new Callable<Integer>() {
- @Override
- public Integer call()
- throws Exception {
- return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentName, segmentFile, tableName)
- .getStatusCode();
- }
- }));
- }
- for (Future<Integer> task : tasks) {
- assertEquals((int) task.get(), HttpStatus.SC_OK);
+ if (numSegments == 1) {
+ File segmentTarFile = segmentTarFiles[0];
+ assertEquals(fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+ HttpStatus.SC_OK);
+ } else {
+ // Upload all segments in parallel
+ ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
+ List<Future<Integer>> futures = new ArrayList<>(numSegments);
+ for (File segmentTarFile : segmentTarFiles) {
+ futures.add(executorService.submit(() -> fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
+ .getStatusCode()));
+ }
+ executorService.shutdown();
+ for (Future<Integer> future : futures) {
+ assertEquals((int) future.get(), HttpStatus.SC_OK);
+ }
}
- executor.shutdown();
}
}
- protected void addOfflineTable(String tableName)
- throws Exception {
- addOfflineTable(tableName, SegmentVersion.v1);
- }
-
- protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
- throws Exception {
- addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null, null);
- }
-
- protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
- String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
- SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
- throws Exception {
- TableConfig tableConfig =
- getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
- sortedColumn, "daily");
- sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
- }
-
- protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
- String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
- SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
- throws Exception {
- TableConfig tableConfig =
- getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
- sortedColumn, "daily");
- sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonString());
- }
-
- private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
- String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
- TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig, String sortedColumn,
- String segmentPushFrequency) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
- .setTimeType(timeType).setSegmentPushFrequency(segmentPushFrequency).setNumReplicas(1)
- .setBrokerTenant(brokerTenant).setServerTenant(serverTenant).setLoadMode(loadMode)
- .setSegmentVersion(segmentVersion.toString()).setInvertedIndexColumns(invertedIndexColumns)
- .setBloomFilterColumns(bloomFilterColumns).setRangeIndexColumns(rangeIndexColumns).setTaskConfig(taskConfig)
- .setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn).build();
- }
-
- protected void dropOfflineTable(String tableName)
- throws Exception {
- sendDeleteRequest(
- _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
- }
-
public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSchemaKafkaAvroMessageDecoder.class);
public static File avroFile;
@@ -379,123 +302,6 @@ public abstract class ClusterTest extends ControllerTest {
}
}
- protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
- String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
- String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
- List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName)
- throws Exception {
- addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
- timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
- }
-
- protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
- String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
- String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
- List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
- throws Exception {
- addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
- timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, numReplicas, null);
- }
-
- protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
- String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
- String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas,
- List<FieldConfig> fieldConfigListForTextColumns)
- throws Exception {
- Map<String, String> streamConfigs = new HashMap<>();
- String streamType = "kafka";
- streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
- if (useLlc) {
- // LLC
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.LOWLEVEL.toString());
- streamConfigs.put(KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST), kafkaBrokerList);
- } else {
- // HLC
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.HIGHLEVEL.toString());
- streamConfigs.put(KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
- kafkaZkUrl);
- streamConfigs.put(KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER),
- kafkaBrokerList);
- }
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- streamConsumerFactoryName);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- kafkaTopic);
- AvroFileSchemaKafkaAvroMessageDecoder.avroFile = avroFile;
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- AvroFileSchemaKafkaAvroMessageDecoder.class.getName());
- streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(realtimeSegmentFlushRows));
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
-
- TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setLLC(useLlc)
- .setTimeColumnName(timeColumnName).setTimeType(timeType).setSchemaName(schemaName).setBrokerTenant(brokerTenant)
- .setServerTenant(serverTenant).setLoadMode(loadMode).setSortedColumn(sortedColumn)
- .setInvertedIndexColumns(invertedIndexColumns).setBloomFilterColumns(bloomFilterColumns)
- .setNoDictionaryColumns(noDictionaryColumns).setStreamConfigs(streamConfigs).setTaskConfig(taskConfig)
- .setNumReplicas(numReplicas).setFieldConfigList(fieldConfigListForTextColumns).build();
-
- // save the realtime table config
- _realtimeTableConfig = tableConfig;
-
- sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
- }
-
- protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols,
- List<String> bloomFilterCols)
- throws Exception {
-
- IndexingConfig config = _realtimeTableConfig.getIndexingConfig();
- config.setInvertedIndexColumns(invertedIndexCols);
- config.setBloomFilterColumns(bloomFilterCols);
-
- sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJsonString());
- }
-
- protected void updateRealtimeTableTenant(String tableName, TenantConfig tenantConfig)
- throws Exception {
-
- _realtimeTableConfig.setTenantConfig(tenantConfig);
-
- sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), _realtimeTableConfig.toJsonString());
- }
-
- protected void dropRealtimeTable(String tableName)
- throws Exception {
- sendDeleteRequest(
- _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
- }
-
- protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
- String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
- String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
- List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName,
- SegmentPartitionConfig segmentPartitionConfig)
- throws Exception {
- addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
- invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig, sortedColumn);
- addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
- timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
- }
-
protected JsonNode getDebugInfo(final String uri)
throws Exception {
return JsonUtils.stringToJsonNode(sendGetRequest(_brokerBaseApiUrl + "/" + uri));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
new file mode 100644
index 0000000..86bc3a1
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ValidationMetrics;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test for all {@link org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}s.
+ * The intention of these tests is not to test functionality of daemons, but simply to check that they run as expected
+ * and process the tables when the controller starts.
+ */
+public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30;
+ private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5;
+ private static final String PERIODIC_TASK_FREQUENCY = "5s";
+
+ // Set initial delay of 30 seconds for periodic tasks, to allow time for tables setup.
+ // Run at 5 seconds frequency in order to keep them running, in case first run happens before table setup.
+ private static class TestControllerConf extends ControllerConf {
+ private TestControllerConf(ControllerConf controllerConf) {
+ copy(controllerConf);
+ }
+
+ @Override
+ public long getStatusCheckerInitialDelayInSeconds() {
+ return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+ }
+
+ @Override
+ public int getStatusCheckerFrequencyInSeconds() {
+ return PERIODIC_TASK_FREQUENCY_SECONDS;
+ }
+
+ @Override
+ public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
+ return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+ }
+
+ @Override
+ public String getRealtimeSegmentRelocatorFrequency() {
+ return PERIODIC_TASK_FREQUENCY;
+ }
+
+ @Override
+ public long getBrokerResourceValidationInitialDelayInSeconds() {
+ return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+ }
+
+ @Override
+ public int getBrokerResourceValidationFrequencyInSeconds() {
+ return PERIODIC_TASK_FREQUENCY_SECONDS;
+ }
+
+ @Override
+ public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
+ return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+ }
+
+ @Override
+ public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+ return PERIODIC_TASK_FREQUENCY_SECONDS;
+ }
+ }
+
+ private static final int NUM_REPLICAS = 3;
+ private static final String TENANT_NAME = "TestTenant";
+
+ private String _currentTable = DEFAULT_TABLE_NAME;
+
+ @Override
+ protected String getTableName() {
+ return _currentTable;
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Override
+ protected int getNumReplicas() {
+ return NUM_REPLICAS;
+ }
+
+ @Override
+ protected String getBrokerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected String getServerTenant() {
+ return TENANT_NAME;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ startZk();
+ startKafka();
+
+ TestControllerConf controllerConf = new TestControllerConf(getDefaultControllerConfiguration());
+ controllerConf.setTenantIsolationEnabled(false);
+ startController(controllerConf);
+ startBroker();
+ startServers(6);
+
+ // Create tenants
+ createBrokerTenant(TENANT_NAME, 1);
+ createServerTenant(TENANT_NAME, 3, 3);
+
+ // Unpack the Avro files
+ int numAvroFiles = unpackAvroData(_tempDir).size();
+ int numOfflineAvroFiles = 8;
+ int numRealtimeAvroFiles = 6;
+ // Avro files has to be ordered as time series data
+ List<File> avroFiles = new ArrayList<>(numAvroFiles);
+ for (int i = 1; i <= numAvroFiles; i++) {
+ avroFiles.add(new File(_tempDir, "On_Time_On_Time_Performance_2014_" + i + ".avro"));
+ }
+ List<File> offlineAvroFiles = avroFiles.subList(0, numOfflineAvroFiles);
+ List<File> realtimeAvroFiles = avroFiles.subList(numAvroFiles - numRealtimeAvroFiles, numAvroFiles);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+ addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(realtimeAvroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ String tableName = getTableName();
+ dropOfflineTable(tableName);
+ dropRealtimeTable(tableName);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Test
+ public void testSegmentStatusChecker()
+ throws Exception {
+ String emptyTable = "emptyTable";
+ String disabledTable = "disabledTable";
+ String tableWithOfflineSegment = "tableWithOfflineSegment";
+
+ _currentTable = emptyTable;
+ addTableConfig(createOfflineTableConfig());
+
+ _currentTable = disabledTable;
+ addTableConfig(createOfflineTableConfig());
+ _helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false);
+
+ _currentTable = tableWithOfflineSegment;
+ addTableConfig(createOfflineTableConfig());
+ uploadSegments(_currentTable, _tarDir);
+ // Turn one replica of a segment OFFLINE
+ HelixHelper.updateIdealState(_helixManager, TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment),
+ idealState -> {
+ assertNotNull(idealState);
+ Map<String, String> instanceStateMap = idealState.getRecord().getMapFields().values().iterator().next();
+ instanceStateMap.entrySet().iterator().next().setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ return idealState;
+ }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+ _currentTable = DEFAULT_TABLE_NAME;
+
+ int numTables = 5;
+ ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
+ TestUtils.waitForCondition(aVoid -> {
+ if (controllerMetrics
+ .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker")
+ != numTables) {
+ return false;
+ }
+ if (!checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
+ null, 3, 100, 0, 100)) {
+ return false;
+ }
+ if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
+ TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, Long.MIN_VALUE, Long.MIN_VALUE,
+ Long.MIN_VALUE, Long.MIN_VALUE)) {
+ return false;
+ }
+ String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ IdealState idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+ if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 3, 100, 0, 100)) {
+ return false;
+ }
+ tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment);
+ idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+ if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 2, 66, 0, 100)) {
+ return false;
+ }
+ tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+ if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 3, 100, 0, 100)) {
+ return false;
+ }
+ return controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT) == 4
+ && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT) == 1
+ && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT) == 1;
+ }, 60_000, "Timed out waiting for SegmentStatusChecker");
+
+ dropOfflineTable(emptyTable);
+ dropOfflineTable(disabledTable);
+ dropOfflineTable(tableWithOfflineSegment);
+ }
+
+ private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType,
+ IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState,
+ long expectedPercentSegmentsAvailable) {
+ if (idealState != null) {
+ if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE) != idealState
+ .toString().length()) {
+ return false;
+ }
+ if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT) != idealState
+ .getPartitionSet().size()) {
+ return false;
+ }
+ }
+ return controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS)
+ == expectedNumReplicas
+ && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS)
+ == expectedPercentReplicas
+ && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE)
+ == expectedSegmentsInErrorState
+ && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE)
+ == expectedPercentSegmentsAvailable;
+ }
+
+ @Test
+ public void testRealtimeSegmentRelocator()
+ throws Exception {
+ // Add relocation tenant config
+ TableConfig realtimeTableConfig = getRealtimeTableConfig();
+ realtimeTableConfig.setTenantConfig(new TenantConfig(TENANT_NAME, TENANT_NAME,
+ new TagOverrideConfig(TENANT_NAME + "_REALTIME", TENANT_NAME + "_OFFLINE")));
+ updateTableConfig(realtimeTableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ // Check servers for ONLINE segment and CONSUMING segments are disjoint sets
+ Set<String> consumingServers = new HashSet<>();
+ Set<String> completedServers = new HashSet<>();
+ IdealState idealState =
+ _helixResourceManager.getTableIdealState(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+ assertNotNull(idealState);
+ for (Map<String, String> instanceStateMap : idealState.getRecord().getMapFields().values()) {
+ for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+ String state = entry.getValue();
+ if (state.equals(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ consumingServers.add(entry.getKey());
+ } else if (state.equals(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ completedServers.add(entry.getKey());
+ }
+ }
+ }
+ return Collections.disjoint(consumingServers, completedServers);
+ }, 60_000, "Timed out waiting for RealtimeSegmentRelocation");
+ }
+
+ @Test
+ public void testBrokerResourceValidationManager() {
+ // Add a new broker with the same tag
+ String brokerId = "Broker_localhost_1234";
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+ instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ String helixClusterName = getHelixClusterName();
+ _helixAdmin.addInstance(helixClusterName, instanceConfig);
+ Set<String> brokersAfterAdd = _helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+ assertTrue(brokersAfterAdd.contains(brokerId));
+
+ String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ TestUtils.waitForCondition(aVoid -> {
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName);
+ assertNotNull(idealState);
+ return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterAdd);
+ }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+
+ // Drop the new added broker
+ _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+ Set<String> brokersAfterDrop = _helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+ assertFalse(brokersAfterDrop.contains(brokerId));
+
+ TestUtils.waitForCondition(input -> {
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName);
+ assertNotNull(idealState);
+ return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterDrop);
+ }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+ }
+
+ @Test
+ public void testOfflineSegmentIntervalChecker() {
+ OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
+ ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
+ String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+ // Wait until OfflineSegmentIntervalChecker gets executed
+ TestUtils.waitForCondition(aVoid -> {
+ long numSegments =
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount"));
+ long numMissingSegments =
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "missingSegmentCount"));
+ long numTotalDocs =
+ validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "TotalDocumentCount"));
+ return numSegments == 8 && numMissingSegments == 0 && numTotalDocs == 79003;
+ }, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+ }
+
+ // TODO: tests for other ControllerPeriodicTasks (RetentionManager, RealtimeSegmentValidationManager)
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
deleted file mode 100644
index 6050314..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.metrics.ValidationMetrics;
-import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
-import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TagOverrideConfig;
-import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.ITestContext;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterGroups;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeGroups;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration tests for all {@link org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}
- * The intention of these tests is not to test functionality of daemons,
- * but simply to check that they run as expected and process the tables when the controller starts.
- *
- * Cluster setup/teardown is common across all tests in the @BeforeClass method {@link ControllerPeriodicTasksIntegrationTests::setup}.
- * This includes:
- * zk, controller, 1 broker, 3 offline servers, 3 realtime servers, kafka with avro loaded, offline table with segments from avro
- *
- * There will be a separate beforeTask(), testTask() and afterTask() for each ControllerPeriodicTask test, grouped by task name.
- * See group = "segmentStatusChecker" for example.
- * The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
- *
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation ->
- * brokerResourceValidationManager -> OfflineSegmentIntervalChecker ....
- */
-public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
-
- // Controller configuration used in this class.
- private class TestControllerConf extends ControllerConf {
- private TestControllerConf(ControllerConf controllerConf) {
- copy(controllerConf);
- }
-
- @Override
- public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
- return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
- }
-
- public long getStatusCheckerInitialDelayInSeconds() {
- return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
- }
-
- public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
- return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
- }
-
- public long getBrokerResourceValidationInitialDelayInSeconds() {
- return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
- }
-
- public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
- return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
- }
- }
-
- private static final String TENANT_NAME = "TestTenant";
- private static final String DEFAULT_TABLE_NAME = "mytable";
-
- private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 60;
- private static final int PERIODIC_TASK_FREQ_SECONDS = 5;
- private static final String PERIODIC_TASK_FREQ = "5s";
-
- private String _currentTableName;
- private List<File> _avroFiles;
-
- /**
- * Setup the cluster for the tests
- * @throws Exception
- */
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- startZk();
- startKafka();
-
- // Set initial delay of 60 seconds for periodic tasks, to allow time for tables setup.
- // Run at 5 seconds freq in order to keep them running, in case first run happens before table setup
- TestControllerConf controllerConf = new TestControllerConf(getDefaultControllerConfiguration());
- controllerConf.setTenantIsolationEnabled(false);
- controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
- controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
- controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
- controllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
- controllerConf.setRealtimeSegmentValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
-
- startController(controllerConf);
- startBroker();
- startServers(6);
-
- // Create tenants
- createBrokerTenant(TENANT_NAME, 1);
- createServerTenant(TENANT_NAME, 3, 3);
-
- // unpack avro into _tempDir
- _avroFiles = unpackAvroData(_tempDir);
-
- // setup a default offline table, shared across all tests. Each test can create additional tables and destroy them
- setupOfflineTableAndSegments(DEFAULT_TABLE_NAME, _avroFiles);
-
- // push avro into kafka, each test can create the realtime table and destroy it
- ExecutorService executor = Executors.newCachedThreadPool();
- pushAvroIntoKafka(_avroFiles, getKafkaTopic(), executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- }
-
- /**
- * Setup offline table, but no segments
- */
- private void setupOfflineTable(String table)
- throws Exception {
- addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null,
- null);
- }
-
- /**
- * Setup offline table, with segments from avro
- */
- private void setupOfflineTableAndSegments(String tableName, List<File> avroFiles)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
- setTableName(tableName);
-
- File schemaFile = getSchemaFile();
- Schema schema = Schema.fromFile(schemaFile);
- String schemaName = schema.getSchemaName();
- addSchema(schemaFile, schemaName);
-
- String timeColumnName = getTimeColumnName();
- FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
- Assert.assertNotNull(fieldSpec);
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
- TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
-
- addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null,
- null, null, null, null);
-
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, null, null, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- uploadSegments(getTableName(), _tarDir);
- waitForAllDocsLoaded(600_000L);
- }
-
- /**
- * Setup realtime table for given tablename and topic
- */
- private void setupRealtimeTable(String table, String topic, File avroFile)
- throws Exception {
- File schemaFile = getSchemaFile();
- Schema schema = Schema.fromFile(schemaFile);
- String schemaName = schema.getSchemaName();
- addSchema(schemaFile, schemaName);
-
- String timeColumnName = getTimeColumnName();
- FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
- Assert.assertNotNull(fieldSpec);
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
- TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
-
- addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic,
- getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME,
- getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
- getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
- }
-
- @Override
- public String getTableName() {
- return _currentTableName;
- }
-
- private void setTableName(String tableName) {
- _currentTableName = tableName;
- }
-
- /**
- * Group - segmentStatusChecker - Integration tests for {@link org.apache.pinot.controller.helix.SegmentStatusChecker}
- * @throws Exception
- */
- @BeforeGroups(groups = "segmentStatusChecker")
- public void beforeTestSegmentStatusCheckerTest(ITestContext context)
- throws Exception {
- String emptyTable = "table1_OFFLINE";
- String disabledOfflineTable = "table2_OFFLINE";
- String basicOfflineTable = getDefaultOfflineTableName();
- String errorOfflineTable = "table4_OFFLINE";
- String basicRealtimeTable = getDefaultRealtimeTableName();
- int numTables = 5;
-
- context.setAttribute("emptyTable", emptyTable);
- context.setAttribute("disabledOfflineTable", disabledOfflineTable);
- context.setAttribute("basicOfflineTable", basicOfflineTable);
- context.setAttribute("errorOfflineTable", errorOfflineTable);
- context.setAttribute("basicRealtimeTable", basicRealtimeTable);
- context.setAttribute("numTables", numTables);
-
- // empty table
- setupOfflineTable(emptyTable);
-
- // table with disabled ideal state
- setupOfflineTable(disabledOfflineTable);
- _helixAdmin.enableResource(getHelixClusterName(), disabledOfflineTable, false);
-
- // some segments offline
- setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
- HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new Function<IdealState, IdealState>() {
- @Nullable
- @Override
- public IdealState apply(@Nullable IdealState input) {
- List<String> segmentNames = Lists.newArrayList(input.getPartitionSet());
- Collections.sort(segmentNames);
-
- Map<String, String> instanceStateMap1 = input.getInstanceStateMap(segmentNames.get(0));
- for (String instance : instanceStateMap1.keySet()) {
- instanceStateMap1.put(instance, "OFFLINE");
- break;
- }
- return input;
- }
- }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
-
- // setup default realtime table
- setupRealtimeTable(basicRealtimeTable, getKafkaTopic(), _avroFiles.get(0));
- }
-
- /**
- * After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
- * Validate that we are seeing the expected numbers
- */
- @Test(groups = "segmentStatusChecker")
- public void testSegmentStatusChecker(ITestContext context)
- throws Exception {
- String emptyTable = (String) context.getAttribute("emptyTable");
- String disabledOfflineTable = (String) context.getAttribute("disabledOfflineTable");
- String basicOfflineTable = (String) context.getAttribute("basicOfflineTable");
- String errorOfflineTable = (String) context.getAttribute("errorOfflineTable");
- String basicRealtimeTable = (String) context.getAttribute("basicRealtimeTable");
- int numTables = (int) context.getAttribute("numTables");
-
- ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
-
- TestUtils.waitForCondition(input -> controllerMetrics
- .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker") >= numTables,
- 240_000, "Timed out waiting for SegmentStatusChecker");
-
- // empty table - table1_OFFLINE
- // num replicas set from ideal state
- checkSegmentStatusCheckerMetrics(controllerMetrics, emptyTable, null, 3, 100, 0, 100);
-
- // disabled table - table2_OFFLINE
- // reset to defaults
- checkSegmentStatusCheckerMetrics(controllerMetrics, disabledOfflineTable, null, Long.MIN_VALUE, Long.MIN_VALUE,
- Long.MIN_VALUE, Long.MIN_VALUE);
-
- // happy path table - mytable_OFFLINE
- IdealState idealState = _helixResourceManager.getTableIdealState(basicOfflineTable);
- checkSegmentStatusCheckerMetrics(controllerMetrics, basicOfflineTable, idealState, 3, 100, 0, 100);
-
- // offline segments - table4_OFFLINE
- // 2 replicas available out of 3, percent 66
- idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
- checkSegmentStatusCheckerMetrics(controllerMetrics, errorOfflineTable, idealState, 2, 66, 0, 100);
-
- // happy path table - mytable_REALTIME
- idealState = _helixResourceManager.getTableIdealState(basicRealtimeTable);
- checkSegmentStatusCheckerMetrics(controllerMetrics, basicRealtimeTable, idealState, 1, 100, 0, 100);
-
- // Total metrics
- Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT), 4);
- Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT), 1);
- Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1);
- }
-
- private void checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableName,
- IdealState idealState, long numReplicas, long percentReplicas, long segmentsInErrorState,
- long percentSegmentsAvailable) {
- if (idealState != null) {
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
- idealState.toString().length());
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENT_COUNT),
- idealState.getPartitionSet().size());
- }
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS),
- numReplicas);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS),
- percentReplicas);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
- segmentsInErrorState);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
- percentSegmentsAvailable);
- }
-
- @AfterGroups(groups = "segmentStatusChecker")
- public void afterTestSegmentStatusChecker(ITestContext context)
- throws Exception {
- String emptyTable = (String) context.getAttribute("emptyTable");
- String disabledOfflineTable = (String) context.getAttribute("disabledOfflineTable");
- String errorOfflineTable = (String) context.getAttribute("errorOfflineTable");
- String basicRealtimeTable = (String) context.getAttribute("basicRealtimeTable");
-
- dropOfflineTable(emptyTable);
- dropOfflineTable(disabledOfflineTable);
- dropOfflineTable(errorOfflineTable);
- }
-
- /**
- * Group - realtimeSegmentRelocator - Integration tests for {@link org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator}
- * @param context
- * @throws Exception
- */
- @BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups = "segmentStatusChecker")
- public void beforeRealtimeSegmentRelocatorTest(ITestContext context)
- throws Exception {
- String relocationTable = getDefaultRealtimeTableName();
- context.setAttribute("relocationTable", relocationTable);
-
- // add tag override for relocation
- TenantConfig tenantConfig = new TenantConfig(TENANT_NAME, TENANT_NAME,
- new TagOverrideConfig(TENANT_NAME + "_REALTIME", TENANT_NAME + "_OFFLINE"));
- updateRealtimeTableTenant(TableNameBuilder.extractRawTableName(relocationTable), tenantConfig);
- }
-
- @Test(groups = "realtimeSegmentRelocator", dependsOnGroups = "segmentStatusChecker")
- public void testRealtimeSegmentRelocator(ITestContext context)
- throws Exception {
-
- String relocationTable = (String) context.getAttribute("relocationTable");
-
- ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
-
- long taskRunCount =
- controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
- .count();
- TestUtils.waitForCondition(input ->
- controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
- .count() > taskRunCount, 60_000, "Timed out waiting for RealtimeSegmentRelocation to run");
-
- Assert.assertTrue(controllerMetrics
- .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentRelocator") > 0);
-
- // check servers for ONLINE segment and CONSUMING segments are disjoint sets
- Set<String> consuming = new HashSet<>();
- Set<String> completed = new HashSet<>();
- IdealState tableIdealState = _helixResourceManager.getTableIdealState(relocationTable);
- for (String partition : tableIdealState.getPartitionSet()) {
- Map<String, String> instanceStateMap = tableIdealState.getInstanceStateMap(partition);
- if (instanceStateMap.containsValue("CONSUMING")) {
- consuming.addAll(instanceStateMap.keySet());
- }
- if (instanceStateMap.containsValue("ONLINE")) {
- completed.addAll(instanceStateMap.keySet());
- }
- }
-
- Assert.assertTrue(Collections.disjoint(consuming, completed));
- }
-
- @BeforeGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
- public void beforeBrokerResourceValidationManagerTest(ITestContext context)
- throws Exception {
- String table1 = "testTable";
- String table2 = "testTable2";
- context.setAttribute("testTableOne", table1);
- context.setAttribute("testTableTwo", table2);
- setupOfflineTable(table1);
- }
-
- @Test(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
- public void testBrokerResourceValidationManager(ITestContext context)
- throws Exception {
- // Check that the first table we added doesn't need to be rebuilt(case where ideal state brokers and brokers in broker resource are the same.
- String table1 = (String) context.getAttribute("testTableOne");
- String table2 = (String) context.getAttribute("testTableTwo");
- TableConfig tableConfigOne = new TableConfigBuilder(TableType.OFFLINE).setTableName(table1).build();
- String partitionNameOne = tableConfigOne.getTableName();
-
- // Ensure that the broker resource is not rebuilt.
- TestUtils.waitForCondition(input -> {
- IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
- return idealState.getInstanceSet(partitionNameOne)
- .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
- }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
- // Add another table that needs to be rebuilt
- TableConfig offlineTableConfigTwo =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(table2).setBrokerTenant(TENANT_NAME)
- .setServerTenant(TENANT_NAME).build();
- _helixResourceManager.addTable(offlineTableConfigTwo);
- String partitionNameTwo = offlineTableConfigTwo.getTableName();
-
- // Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
- final String brokerId = "Broker_localhost_2";
- InstanceConfig instanceConfig = new InstanceConfig(brokerId);
- instanceConfig.setInstanceEnabled(true);
- instanceConfig.setHostName("Broker_localhost");
- instanceConfig.setPort("2");
- _helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
- _helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
- TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
-
- // Count the number of times we check on ideal state change, which is made by rebuild broker resource method.
- AtomicInteger count = new AtomicInteger();
- TestUtils.waitForCondition(input -> {
- count.getAndIncrement();
- IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
- return idealState.getInstanceSet(partitionNameTwo)
- .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
- }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
- // At least the broker resource won't be changed immediately.
- Assert.assertTrue(count.get() > 1);
-
- // Drop the instance so that broker resource doesn't match the current one.
- _helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
- count.set(0);
- TestUtils.waitForCondition(input -> {
- count.getAndIncrement();
- IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
- return idealState.getInstanceSet(partitionNameTwo)
- .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
- }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
- // At least the broker resource won't be changed immediately.
- Assert.assertTrue(count.get() > 1);
- }
-
- @AfterGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
- public void afterBrokerResourceValidationManagerTest(ITestContext context)
- throws Exception {
- String table1 = (String) context.getAttribute("testTableOne");
- String table2 = (String) context.getAttribute("testTableTwo");
- dropOfflineTable(table1);
- dropOfflineTable(table2);
- }
-
- @Test(groups = "offlineSegmentIntervalChecker", dependsOnGroups = "brokerResourceValidationManager")
- public void testOfflineSegmentIntervalChecker()
- throws Exception {
- OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
- ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
-
- String tablNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
-
- // Wait until OfflineSegmentIntervalChecker gets executed
- TestUtils.waitForCondition(
- input -> validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount"))
- > 0, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
-
- // Test the validation metrics values updated by OfflineSegmentIntervalChecker against the known values
- // from segment metadata
- Assert.assertEquals(
- validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")), 12);
- Assert.assertEquals(
- validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "missingSegmentCount")), 0);
- Assert.assertEquals(
- validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "TotalDocumentCount")),
- 115545);
- }
-
- /**
- * Group - realtimeSegmentValidationManager - Integration tests for {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
- * @param context
- * @throws Exception
- */
-
- @Test(groups = "realtimeSegmentValidationManager", dependsOnGroups = "offlineSegmentIntervalChecker")
- public void testRealtimeSegmentValidationManager(ITestContext context)
- throws Exception {
- ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
- long taskRunCount = controllerMetrics
- .getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
-
- // Wait until the RealtimeSegmentValidationManager runs at least once. Most likely it already ran once
- // on the realtime table (default one) already setup, so we should have the total document count on that
- // realtime table.
- TestUtils.waitForCondition(input -> controllerMetrics
- .getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count()
- > taskRunCount, 60_000, "Timed out waiting for RealtimeSegmentValidationManager to run");
-
- Assert.assertTrue(controllerMetrics
- .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentValidationManager")
- > 0);
- RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
- ValidationMetrics validationMetrics = validationManager.getValidationMetrics();
- // Make sure we processed the realtime table to get the total document count. Should have been done the first
- // time RealtimeSegmentValidationManager ran on the default realtime table.
- Assert.assertTrue(validationMetrics
- .getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(), "TotalDocumentCount")) > 0);
- }
-
- // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , RealtimeSegmentValidationManager)
-
- @Override
- protected boolean useLlc() {
- return true;
- }
-
- private String getDefaultOfflineTableName() {
- return DEFAULT_TABLE_NAME + "_OFFLINE";
- }
-
- private String getDefaultRealtimeTableName() {
- return DEFAULT_TABLE_NAME + "_REALTIME";
- }
-
- /**
- * Tear down the cluster after tests
- * @throws Exception
- */
- @AfterClass
- public void tearDown()
- throws Exception {
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
- FileUtils.deleteDirectory(_tempDir);
- }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index a4d302d..9fa9519 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.ConvertToRawIndexTask;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -56,10 +57,16 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
@Nullable
@Override
- protected List<String> getRawIndexColumns() {
+ protected List<String> getNoDictionaryColumns() {
return null;
}
+ // NOTE: Only allow converting raw index for v1 segment
+ @Override
+ protected String getSegmentVersion() {
+ return SegmentVersion.v1.name();
+ }
+
@Override
protected TableTaskConfig getTaskConfig() {
Map<String, String> convertToRawIndexTaskConfigs = new HashMap<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
deleted file mode 100644
index ea12bc5..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.io.Files;
-import com.yammer.metrics.core.MetricsRegistry;
-import java.io.File;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
-import org.apache.pinot.core.data.manager.realtime.SegmentCommitter;
-import org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory;
-import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.core.data.readers.PinotSegmentUtil;
-import org.apache.pinot.server.realtime.ControllerLeaderLocator;
-import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Integration test that extends RealtimeClusterIntegrationTest to test default commit but uses low-level Kafka consumer.
- */
-public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterIntegrationTest {
- private File _indexDir;
- private File _realtimeSegmentUntarred;
-
- private static final String TARGZ_SUFFIX = ".tar.gz";
- private static final long END_OFFSET = 500L;
- private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCommitterRealtimeIntegrationTest.class);
-
- @BeforeClass
- @Override
- public void setUp()
- throws Exception {
- // Remove the consumer directory
- File consumerDirectory = new File(CONSUMER_DIRECTORY);
- if (consumerDirectory.exists()) {
- FileUtils.deleteDirectory(consumerDirectory);
- }
-
- startZk();
- startController();
- addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
- addFakeServerInstancesToAutoJoinHelixCluster(1, true);
-
- ControllerLeaderLocator.create(_helixManager);
-
- // Start Kafka
- startKafka();
- List<File> avroFiles = unpackAvroData(_tempDir);
-
- setUpRealtimeTable(avroFiles.get(0));
- buildSegment();
- }
-
- @Override
- protected boolean useLlc() {
- return true;
- }
-
- @Test
- public void testDefaultCommitter()
- throws Exception {
- ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry());
- ServerSegmentCompletionProtocolHandler protocolHandler =
- new ServerSegmentCompletionProtocolHandler(serverMetrics, getTableName());
-
- LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor =
- mock(LLRealtimeSegmentDataManager.SegmentBuildDescriptor.class);
-
- RealtimeSegmentZKMetadata metadata = _helixResourceManager.getRealtimeSegmentMetadata(getTableName()).get(0);
-
- String instanceId = _helixResourceManager.getAllInstances().get(0);
-
- SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
- params.withSegmentName(metadata.getSegmentName()).withInstanceId(instanceId).withOffset(END_OFFSET);
-
- _realtimeSegmentUntarred = new File(_indexDir.getParentFile(), metadata.getSegmentName());
- FileUtils.copyDirectory(_indexDir, _realtimeSegmentUntarred);
-
- TarGzCompressionUtils.createTarGzOfDirectory(_realtimeSegmentUntarred.getAbsolutePath());
-
- // SegmentBuildDescriptor is currently not a static class, so we will mock this object.
- StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(END_OFFSET);
- when(segmentBuildDescriptor.getSegmentTarFilePath()).thenReturn(_realtimeSegmentUntarred + TARGZ_SUFFIX);
- when(segmentBuildDescriptor.getBuildTimeMillis()).thenReturn(0L);
- when(segmentBuildDescriptor.getOffset()).thenReturn(endOffset);
- when(segmentBuildDescriptor.getSegmentSizeBytes()).thenReturn(0L);
- when(segmentBuildDescriptor.getWaitTimeMillis()).thenReturn(0L);
-
- // Get realtime segment name
- String segmentList =
- sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), "REALTIME"));
- JsonNode realtimeSegmentsList = getSegmentsFromJsonSegmentAPI(segmentList, TableType.REALTIME.toString());
- String segmentName = realtimeSegmentsList.get(0).asText();
-
- // Send segmentConsumed request
- sendGetRequest("http://localhost:" + DEFAULT_CONTROLLER_PORT + "/segmentConsumed?instance=" + instanceId + "&name="
- + segmentName + "&offset=" + END_OFFSET);
-
- SegmentCommitterFactory segmentCommitterFactory = new SegmentCommitterFactory(LOGGER, protocolHandler);
- SegmentCommitter segmentCommitter = segmentCommitterFactory.createDefaultSegmentCommitter(params);
- segmentCommitter.commit(segmentBuildDescriptor);
- }
-
- public void buildSegment()
- throws Exception {
- Schema schema = _helixResourceManager.getSchema(getTableName());
- TableConfig tableConfig = _helixResourceManager
- .getTableConfig(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(getTableName()));
- String _segmentOutputDir = Files.createTempDir().toString();
- List<GenericRow> _rows = PinotSegmentUtil.createTestData(schema, 1);
- RecordReader _recordReader = new GenericRowRecordReader(_rows);
-
- _indexDir = PinotSegmentUtil.createSegment(tableConfig, schema, "segmentName", _segmentOutputDir, _recordReader);
- }
-
- private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
- throws Exception {
- return JsonUtils.stringToJsonNode(json).get(0).get(type);
- }
-
- @Override
- public void tearDown()
- throws Exception {
- super.tearDown();
- _indexDir.deleteOnExit();
- _realtimeSegmentUntarred.deleteOnExit();
- }
-
- @Override
- public void testDictionaryBasedQueries()
- throws Exception {
- }
-
- @Override
- public void testGeneratedQueriesWithMultiValues()
- throws Exception {
-
- }
-
- @Override
- public void testHardcodedSqlQueries()
- throws Exception {
- }
-
- @Override
- public void testInstanceShutdown()
- throws Exception {
- }
-
- @Override
- public void testQueriesFromQueryFile()
- throws Exception {
- }
-
- @Override
- public void testQueryExceptions()
- throws Exception {
- }
-
- @Override
- public void testSqlQueriesFromQueryFile()
- throws Exception {
- }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index ac3b378..b05244f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.testng.annotations.BeforeClass;
/**
@@ -36,13 +35,6 @@ import org.testng.annotations.BeforeClass;
*/
public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
- @BeforeClass
- @Override
- public void setUp()
- throws Exception {
- super.setUp();
- }
-
@Override
protected String getStreamConsumerFactoryClassName() {
return FlakyStreamFactory.class.getName();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index e9e8e47..d434d9d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -23,20 +23,15 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -53,8 +48,6 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final int NUM_OFFLINE_SEGMENTS = 8;
private static final int NUM_REALTIME_SEGMENTS = 6;
- private Schema _schema;
-
protected int getNumOfflineSegments() {
return NUM_OFFLINE_SEGMENTS;
}
@@ -63,6 +56,21 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
return NUM_REALTIME_SEGMENTS;
}
+ @Override
+ protected String getBrokerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected String getServerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected void overrideServerConf(Configuration configuration) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
+ }
+
@BeforeClass
public void setUp()
throws Exception {
@@ -75,32 +83,26 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles);
List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles);
- ExecutorService executor = Executors.newCachedThreadPool();
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+ addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
- // Create segments from Avro data
- File schemaFile = getSchemaFile();
- _schema = Schema.fromFile(schemaFile);
+ // Create and upload segments
ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(offlineAvroFiles, 0, _segmentDir, _tarDir, getTableName(), null, getRawIndexColumns(),
- _schema, executor);
-
- // Push data into the Kafka topic
- pushAvroIntoKafka(realtimeAvroFiles, getKafkaTopic(), executor);
-
- // Load data into H2
- setUpH2Connection(avroFiles, executor);
-
- // Initialize query generator
- setUpQueryGenerator(avroFiles, executor);
+ .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
+ // Push data into Kafka
+ pushAvroIntoKafka(realtimeAvroFiles);
- // Create Pinot table
- setUpTable(avroFiles.get(0));
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
- // Upload all segments
- uploadSegments(getTableName(), _tarDir);
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
@@ -124,30 +126,6 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
createServerTenant(TENANT_NAME, 1, 1);
}
- protected void setUpTable(File avroFile)
- throws Exception {
- String schemaName = _schema.getSchemaName();
- addSchema(getSchemaFile(), schemaName);
-
- String timeColumnName = getTimeColumnName();
- FieldSpec fieldSpec = _schema.getFieldSpecFor(timeColumnName);
- Assert.assertNotNull(fieldSpec);
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
- TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
-
- addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
- getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
- TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRangeIndexColumns(),
- getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
- }
-
- @Override
- protected void overrideServerConf(Configuration configuration) {
- configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
- }
-
protected List<File> getAllAvroFiles()
throws Exception {
// Unpack the Avro files
@@ -308,7 +286,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
public void tearDown()
throws Exception {
// Try deleting the tables and check that they have no routing table
- final String tableName = getTableName();
+ String tableName = getTableName();
dropOfflineTable(tableName);
dropRealtimeTable(tableName);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index 8935966..9df57ee 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -28,22 +28,20 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.query.comparison.QueryComparison;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.ITestResult;
@@ -112,8 +110,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
CustomHybridClusterIntegrationTest._tableName = args[argIdx++];
File schemaFile = new File(args[argIdx++]);
- Preconditions.checkState(schemaFile.isFile());
- CustomHybridClusterIntegrationTest._schemaFile = schemaFile;
+ CustomHybridClusterIntegrationTest._schema = Schema.fromFile(schemaFile);
File dataDir = new File(args[argIdx++]);
Preconditions.checkState(dataDir.isDirectory());
CustomHybridClusterIntegrationTest._dataDir = dataDir;
@@ -165,11 +162,11 @@ public class HybridClusterIntegrationTestCommandLineRunner {
private static final String TENANT_NAME = "TestTenant";
private static final int ZK_PORT = 3191;
private static final String ZK_STR = "localhost:" + ZK_PORT;
- private static final String KAFKA_ZK_STR = ZK_STR + "/kafka";
+ private static final int NUM_KAFKA_BROKERS = 1;
private static final int KAFKA_PORT = 20092;
- private static final String KAFKA_BROKER = "localhost:" + KAFKA_PORT;
+ private static final String KAFKA_ZK_STR = ZK_STR + "/kafka";
private static final int CONTROLLER_PORT = 9998;
- private static final int BROKER_BASE_PORT = 19099;
+ private static final int BROKER_PORT = 19099;
private static final int SERVER_BASE_ADMIN_API_PORT = 9097;
private static final int SERVER_BASE_NETTY_PORT = 9098;
@@ -181,7 +178,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
private static boolean _enabled = false;
private static boolean _useLlc = false;
private static String _tableName;
- private static File _schemaFile;
+ private static Schema _schema;
private static File _dataDir;
private static List<String> _invertedIndexColumns;
private static String _sortedColumn;
@@ -190,7 +187,6 @@ public class HybridClusterIntegrationTestCommandLineRunner {
private List<File> _realtimeAvroFiles;
private File _queryFile;
private File _responseFile;
- private StreamDataServerStartable _kafkaStarter;
private long _countStarResult;
public CustomHybridClusterIntegrationTest() {
@@ -222,13 +218,24 @@ public class HybridClusterIntegrationTestCommandLineRunner {
Assert.assertTrue(_responseFile.isFile());
}
- @Nonnull
@Override
protected String getTableName() {
return _tableName;
}
@Override
+ protected String getSchemaName() {
+ return _schema.getSchemaName();
+ }
+
+ @Nullable
+ @Override
+ protected String getTimeColumnName() {
+ TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+ return timeFieldSpec != null ? timeFieldSpec.getName() : null;
+ }
+
+ @Override
protected long getCountStarResult() {
return _countStarResult;
}
@@ -244,6 +251,64 @@ public class HybridClusterIntegrationTestCommandLineRunner {
}
@Override
+ protected int getNumKafkaBrokers() {
+ return NUM_KAFKA_BROKERS;
+ }
+
+ @Override
+ protected int getBaseKafkaPort() {
+ return KAFKA_PORT;
+ }
+
+ @Override
+ protected String getKafkaZKAddress() {
+ return KAFKA_ZK_STR;
+ }
+
+ @Override
+ protected String getSortedColumn() {
+ return _sortedColumn;
+ }
+
+ @Override
+ protected List<String> getInvertedIndexColumns() {
+ return _invertedIndexColumns;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getRangeIndexColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getBloomFilterColumns() {
+ return null;
+ }
+
+ @Override
+ protected String getLoadMode() {
+ return ReadMode.mmap.name();
+ }
+
+ @Override
+ protected String getBrokerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected String getServerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
protected long getCurrentCountStarResult()
throws Exception {
return postQuery("SELECT COUNT(*) FROM " + getTableName()).get("aggregationResults").get(0).get("value").asLong();
@@ -260,11 +325,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
// Start Zk and Kafka
startZk(ZK_PORT);
- _kafkaStarter = KafkaStarterUtils.startServer(KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
- KafkaStarterUtils.getDefaultKafkaConfiguration());
-
- // Create Kafka topic
- _kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
+ startKafka();
// Start the Pinot cluster
ControllerConf config = getDefaultControllerConfiguration();
@@ -272,37 +333,22 @@ public class HybridClusterIntegrationTestCommandLineRunner {
config.setZkStr(ZK_STR);
config.setTenantIsolationEnabled(false);
startController(config);
- startBroker(BROKER_BASE_PORT, ZK_STR);
+ startBroker(BROKER_PORT, ZK_STR);
startServers(2, SERVER_BASE_ADMIN_API_PORT, SERVER_BASE_NETTY_PORT, ZK_STR);
// Create tenants
createBrokerTenant(TENANT_NAME, 1);
createServerTenant(TENANT_NAME, 1, 1);
- // Create segments from Avro data
- ExecutorService executor = Executors.newCachedThreadPool();
- Schema schema = Schema.fromFile(_schemaFile);
+ // Create and upload the schema and table config
+ addSchema(_schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+ addTableConfig(createRealtimeTableConfig(_realtimeAvroFiles.get(0)));
+
+ // Create and upload segments
ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(_offlineAvroFiles, 0, _segmentDir, _tarDir, _tableName, null, getRawIndexColumns(),
- schema, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- // Create Pinot table
- String schemaName = schema.getSchemaName();
- addSchema(_schemaFile, schemaName);
- String timeColumnName = getTimeColumnName();
- FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
- Assert.assertNotNull(fieldSpec);
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
- TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
- addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
- _realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
- _sortedColumn, _invertedIndexColumns, null, null, null, null, getStreamConsumerFactoryClassName(), null);
-
- // Upload all segments
+ .buildSegmentsFromAvro(_offlineAvroFiles, offlineTableConfig, _schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);
}
@@ -320,9 +366,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
try (BufferedReader responseFileReader = new BufferedReader(new FileReader(_responseFile))) {
for (File realtimeAvroFile : _realtimeAvroFiles) {
// Push one avro file into the Kafka topic
- ClusterIntegrationTestUtils
- .pushAvroIntoKafka(Collections.singletonList(realtimeAvroFile), KAFKA_BROKER, getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
+ pushAvroIntoKafka(Collections.singletonList(realtimeAvroFile));
try (BufferedReader queryFileReader = new BufferedReader(new FileReader(_queryFile))) {
// Set the expected COUNT(*) result and wait for all documents loaded
@@ -344,7 +388,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
public void run() {
try {
JsonNode actualResponse =
- postQuery(new PinotQueryRequest("pql", currentQuery), "http://localhost:" + BROKER_BASE_PORT);
+ postQuery(new PinotQueryRequest("pql", currentQuery), "http://localhost:" + BROKER_PORT);
if (QueryComparison.compareWithEmpty(actualResponse, expectedResponse)
== QueryComparison.ComparisonStatus.FAILED) {
numFailedQueries.getAndIncrement();
@@ -383,7 +427,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
stopServer();
stopBroker();
stopController();
- _kafkaStarter.stop();
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
index db22049..1bb6018 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,25 +28,19 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -55,128 +48,75 @@ import org.testng.annotations.Test;
public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
+ private static final int NUM_TOTAL_DOCS = 1000;
+ private final List<String> sortedSequenceIds = new ArrayList<>(NUM_TOTAL_DOCS);
- protected static final String DEFAULT_TABLE_NAME = "myTable";
- static final long TOTAL_DOCS = 1_000L;
- private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathClusterIntegrationTest.class);
- private final List<String> sortedSequenceIds = new ArrayList<>();
- protected Schema _schema;
- private String _currentTable;
-
- @Nonnull
- @Override
- protected String getTableName() {
- return _currentTable;
- }
-
- @Nonnull
@Override
- protected String getSchemaFileName() {
- return "";
+ protected long getCountStarResult() {
+ return NUM_TOTAL_DOCS;
}
@BeforeClass
public void setUp()
throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
startController();
startBroker();
- startServers(1);
-
- _schema = new Schema();
-
- FieldSpec jsonFieldSpec = new DimensionFieldSpec();
- jsonFieldSpec.setDataType(DataType.STRING);
- jsonFieldSpec.setDefaultNullValue("");
- jsonFieldSpec.setName("myMap");
- jsonFieldSpec.setSingleValueField(true);
- _schema.addField(jsonFieldSpec);
- FieldSpec jsonStrFieldSpec = new DimensionFieldSpec();
- jsonStrFieldSpec.setDataType(DataType.STRING);
- jsonStrFieldSpec.setDefaultNullValue("");
- jsonStrFieldSpec.setName("myMapStr");
- jsonStrFieldSpec.setSingleValueField(true);
- _schema.addField(jsonStrFieldSpec);
- FieldSpec complexJsonStrFieldSpec = new DimensionFieldSpec();
- complexJsonStrFieldSpec.setDataType(DataType.STRING);
- complexJsonStrFieldSpec.setDefaultNullValue("");
- complexJsonStrFieldSpec.setName("complexMapStr");
- complexJsonStrFieldSpec.setSingleValueField(true);
- _schema.addField(complexJsonStrFieldSpec);
-
- // Create the tables
- ArrayList<String> invertedIndexColumns = Lists.newArrayList();
- addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null, SegmentVersion.v1, invertedIndexColumns, null,
- null, null, null, null);
-
- setUpSegmentsAndQueryGenerator();
+ startServer();
+
+ // Create and upload the schema and table config
+ String rawTableName = getTableName();
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(rawTableName).addSingleValueDimension("myMap", DataType.STRING)
+ .addSingleValueDimension("myMapStr", DataType.STRING)
+ .addSingleValueDimension("complexMapStr", DataType.STRING).build();
+ addSchema(schema);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).build();
+ addTableConfig(tableConfig);
+
+ // Create and upload segments
+ File avroFile = createAvroFile();
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(rawTableName, _tarDir);
// Wait for all documents loaded
- _currentTable = DEFAULT_TABLE_NAME;
waitForAllDocsLoaded(60_000);
}
- @Override
- protected long getCountStarResult() {
- return TOTAL_DOCS;
- }
-
- protected void setUpSegmentsAndQueryGenerator()
+ private File createAvroFile()
throws Exception {
- List<Field> fields = new ArrayList<>();
- fields.add(new Field("myMapStr", org.apache.avro.Schema.create(Type.STRING), "", null));
- fields.add(new Field("complexMapStr", org.apache.avro.Schema.create(Type.STRING), "", null));
- org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", "some desc", null, false);
+ org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ List<Field> fields = Arrays.asList(new Field("myMapStr", org.apache.avro.Schema.create(Type.STRING), null, null),
+ new Field("complexMapStr", org.apache.avro.Schema.create(Type.STRING), null, null));
avroSchema.setFields(fields);
- DataFileWriter<GenericData.Record> recordWriter =
- new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(avroSchema));
- String parent = "/tmp/mapTest";
- File avroFile = new File(parent, "part-" + 0 + ".avro");
- avroFile.getParentFile().mkdirs();
- recordWriter.create(avroSchema, avroFile);
-
- for (int i = 0; i < TOTAL_DOCS; i++) {
- Map<String, String> map = new HashMap<>();
- map.put("k1", "value-k1-" + i);
- map.put("k2", "value-k2-" + i);
- GenericData.Record record = new GenericData.Record(avroSchema);
- record.put("myMapStr", JsonUtils.objectToString(map));
-
- Map<String, Object> complexMap = new HashMap<>();
- complexMap.put("k1", "value-k1-" + i);
- complexMap.put("k2", "value-k2-" + i);
- complexMap.put("k3", Arrays.asList("value-k3-0-" + i, "value-k3-1-" + i, "value-k3-2-" + i));
- complexMap.put("k4", ImmutableMap
- .of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i, "k4-k3", "value-k4-k3-" + i, "met", i));
- record.put("complexMapStr", JsonUtils.objectToString(complexMap));
- recordWriter.append(record);
- sortedSequenceIds.add(String.valueOf(i));
+ File avroFile = new File(_tempDir, "data.avro");
+ try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+ Map<String, String> map = new HashMap<>();
+ map.put("k1", "value-k1-" + i);
+ map.put("k2", "value-k2-" + i);
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put("myMapStr", JsonUtils.objectToString(map));
+
+ Map<String, Object> complexMap = new HashMap<>();
+ complexMap.put("k1", "value-k1-" + i);
+ complexMap.put("k2", "value-k2-" + i);
+ complexMap.put("k3", Arrays.asList("value-k3-0-" + i, "value-k3-1-" + i, "value-k3-2-" + i));
+ complexMap.put("k4", ImmutableMap
+ .of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i, "k4-k3", "value-k4-k3-" + i, "met", i));
+ record.put("complexMapStr", JsonUtils.objectToString(complexMap));
+ fileWriter.append(record);
+ sortedSequenceIds.add(String.valueOf(i));
+ }
}
Collections.sort(sortedSequenceIds);
- recordWriter.close();
-
- // Unpack the Avro files
- List<File> avroFiles = Lists.newArrayList(avroFile);
-
- // Create and upload segments without star tree indexes from Avro data
- createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
- }
-
- private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, null, _schema, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- uploadSegments(getTableName(), _tarDir);
+ return avroFile;
}
@Test
@@ -237,7 +177,6 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
JsonNode pinotResponse = postQuery(pqlQuery);
ArrayNode selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
- LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
Assert.assertNotNull(selectionResults);
Assert.assertTrue(selectionResults.size() > 0);
for (int i = 0; i < selectionResults.size(); i++) {
@@ -264,9 +203,8 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
+ " where jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') = 'value-k4-k1-0'";
pinotResponse = postQuery(pqlQuery);
selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
- LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
Assert.assertNotNull(selectionResults);
- Assert.assertTrue(selectionResults.size() == 1);
+ Assert.assertEquals(selectionResults.size(), 1);
for (int i = 0; i < selectionResults.size(); i++) {
String value = selectionResults.get(i).get(0).textValue();
Assert.assertEquals(value,
@@ -275,17 +213,16 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
//selection order by
pqlQuery = "Select complexMapStr from " + DEFAULT_TABLE_NAME
- + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') DESC LIMIT " + TOTAL_DOCS;
+ + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') DESC LIMIT " + NUM_TOTAL_DOCS;
pinotResponse = postQuery(pqlQuery);
selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
- LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
Assert.assertNotNull(selectionResults);
Assert.assertTrue(selectionResults.size() > 0);
for (int i = 0; i < selectionResults.size(); i++) {
String value = selectionResults.get(i).get(0).textValue();
Assert.assertTrue(value.indexOf("-k1-") > 0);
Map results = JsonUtils.stringToObject(value, Map.class);
- String seqId = sortedSequenceIds.get((int) (TOTAL_DOCS - 1 - i));
+ String seqId = sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
Assert.assertEquals(results.get("k1"), "value-k1-" + seqId);
Assert.assertEquals(results.get("k2"), "value-k2-" + seqId);
final List k3 = (List) results.get("k3");
@@ -303,14 +240,13 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
pqlQuery = "Select sum(jsonExtractScalar(complexMapStr,'$.k4.met','INT')) from " + DEFAULT_TABLE_NAME
+ " group by jsonExtractScalar(complexMapStr,'$.k1','STRING')";
pinotResponse = postQuery(pqlQuery);
- LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, pinotResponse);
Assert.assertNotNull(pinotResponse.get("aggregationResults"));
JsonNode groupByResult = pinotResponse.get("aggregationResults").get(0).get("groupByResult");
Assert.assertNotNull(groupByResult);
Assert.assertTrue(groupByResult.isArray());
Assert.assertTrue(groupByResult.size() > 0);
for (int i = 0; i < groupByResult.size(); i++) {
- String seqId = sortedSequenceIds.get((int) (TOTAL_DOCS - 1 - i));
+ String seqId = sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
final JsonNode groupbyRes = groupByResult.get(i);
Assert.assertEquals(groupbyRes.get("group").get(0).asText(), "value-k1-" + seqId);
Assert.assertEquals(groupbyRes.get("value").asDouble(), Double.parseDouble(seqId));
@@ -320,7 +256,7 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
@AfterClass
public void tearDown()
throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
+ dropOfflineTable(getTableName());
stopServer();
stopBroker();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 5ba2b5a..d77862f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -24,10 +24,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import org.apache.avro.reflect.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -56,58 +56,57 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
private final boolean _isDirectAlloc = RANDOM.nextBoolean();
private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private final boolean _enableSplitCommit = RANDOM.nextBoolean();
private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
private final long _startTime = System.currentTimeMillis();
- @BeforeClass
@Override
- public void setUp()
- throws Exception {
- System.out.println(String.format(
- "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s",
- RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource));
-
- // Remove the consumer directory
- File consumerDirectory = new File(CONSUMER_DIRECTORY);
- if (consumerDirectory.exists()) {
- FileUtils.deleteDirectory(consumerDirectory);
- }
+ protected boolean useLlc() {
+ return true;
+ }
- super.setUp();
+ @Override
+ protected String getLoadMode() {
+ return ReadMode.mmap.name();
}
@Override
public void startController() {
ControllerConf controllerConfig = getDefaultControllerConfiguration();
controllerConfig.setHLCTablesAllowed(false);
- controllerConfig.setSplitCommit(true);
+ controllerConfig.setSplitCommit(_enableSplitCommit);
startController(controllerConfig);
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
}
@Override
- protected boolean useLlc() {
- return true;
- }
-
- @Nullable
- @Override
- protected String getLoadMode() {
- return "MMAP";
- }
-
- @Override
protected void overrideServerConf(Configuration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
if (_isConsumerDirConfigured) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
}
- configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
- configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+ if (_enableSplitCommit) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+ }
configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
}
+ @BeforeClass
+ @Override
+ public void setUp()
+ throws Exception {
+ System.out.println(String.format(
+ "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+ RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+ // Remove the consumer directory
+ FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+ super.setUp();
+ }
+
@Test
public void testConsumerDirectoryExists() {
File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
@@ -136,8 +135,10 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
- updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
- sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+ TableConfig tableConfig = getRealtimeTableConfig();
+ tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
+ updateTableConfig(tableConfig);
+ reloadRealtimeTable(getTableName());
TestUtils.waitForCondition(aVoid -> {
try {
@@ -165,6 +166,6 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
@Test
public void testReload()
throws Exception {
- super.testReload(false);
+ testReload(false);
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
index 19bfcbb..6f7b408 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
@@ -18,61 +18,99 @@
*/
package org.apache.pinot.integration.tests;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.TextNode;
-import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
/**
* Cluster integration test for near realtime text search
*/
-@Test(enabled=false)
-public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest {
+ private static final String TEXT_COLUMN_NAME = "skills";
+ private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+ private static final int NUM_SKILLS = 24;
+ private static final int NUM_MATCHING_SKILLS = 4;
+ private static final int NUM_RECORDS = NUM_SKILLS * 1000;
+ private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000;
+
+ private static final String TEST_TEXT_COLUMN_QUERY =
+ "SELECT COUNT(*) FROM mytable WHERE TEXT_MATCH(skills, '\"machine learning\" AND spark')";
+
+ @Override
+ public String getTimeColumnName() {
+ return TIME_COLUMN_NAME;
+ }
- private static final String TABLE_NAME = "mytable";
- private static final String SKILLS_TEXT_COL_NAME = "SKILLS_TEXT_COL";
- private static final String TIME_COL_NAME = "TIME_COL";
- private static final String INT_COL_NAME = "INT_COL";
- private static final int INT_BASE_VALUE = 10000;
- Schema _schema;
+ // TODO: Support Lucene index on HLC consuming segments
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ protected String getSortedColumn() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getInvertedIndexColumns() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return Collections.singletonList(TEXT_COLUMN_NAME);
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getRangeIndexColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getBloomFilterColumns() {
+ return null;
+ }
+
+ @Override
+ protected List<FieldConfig> getFieldConfigs() {
+ return Collections.singletonList(
+ new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null));
+ }
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
- _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addSingleValueDimension(SKILLS_TEXT_COL_NAME, FieldSpec.DataType.STRING)
- .addMetric(INT_COL_NAME, FieldSpec.DataType.INT)
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COL_NAME), null)
- .build();
-
// Start the Pinot cluster
startZk();
startController();
@@ -82,37 +120,34 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration
// Start Kafka
startKafka();
- // Unpack the Avro files
+ // Create the Avro file
File avroFile = createAvroFile();
- ExecutorService executor = Executors.newCachedThreadPool();
-
- // Push data into the Kafka topic
- pushAvroIntoKafka(Lists.newArrayList(avroFile), getKafkaTopic(), executor);
-
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- // Create Pinot table
- addSchema(_schema);
- List<FieldConfig> textIndexColumns = new ArrayList<>();
- FieldConfig fieldConfig = new FieldConfig(SKILLS_TEXT_COL_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null);
- textIndexColumns.add(fieldConfig);
-
- addRealtimeTable(TABLE_NAME, true, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
- getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, null, null, TABLE_NAME,
- getBrokerTenant(), getServerTenant(), getLoadMode(), null, null,
- null, null, getTaskConfig(), getStreamConsumerFactoryClassName(),
- 1, textIndexColumns);
-
- // just wait for 2sec for few docs to be loaded
- waitForDocsLoaded(2000L, false );
+ // Create and upload the schema and table config
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+ .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN_NAME), null)
+ .build();
+ addSchema(schema);
+ addTableConfig(createRealtimeTableConfig(avroFile));
+
+ // Push data into Kafka
+ pushAvroIntoKafka(Collections.singletonList(avroFile));
+
+ // Wait until the table is queryable
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getCurrentCountStarResult() >= 0;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 10_000L, "Failed to get COUNT(*) result");
}
@AfterClass
public void tearDown()
throws Exception {
- dropRealtimeTable(TABLE_NAME);
+ dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
@@ -123,66 +158,61 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration
private File createAvroFile()
throws Exception {
- // read the skills file
- String[] skills = new String[100];
- int skillCount = 0;
- try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
- BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+ // Read all skills from the skill file
+ InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
+ assertNotNull(inputStream);
+ List<String> skills = new ArrayList<>(NUM_SKILLS);
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
- skills[skillCount++] = line;
+ skills.add(line);
}
}
-
- org.apache.avro.Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(_schema);
- DataFileWriter avroRecordWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
- String pathToAvroFile = _tempDir.getAbsolutePath() + "/" + "skills.avro";
- File outputAvroFile = new File(pathToAvroFile);
- avroRecordWriter.create(avroSchema, outputAvroFile);
-
- int counter = 0;
- Random random = new Random();
- // create Avro file with 100k documents
- // it will be later pushed to Kafka
- while (counter < 100000) {
- GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(INT_COL_NAME, INT_BASE_VALUE + counter);
- if (counter >= skillCount) {
- int index = random.nextInt(skillCount);
- record.put(SKILLS_TEXT_COL_NAME, skills[index]);
- } else {
- record.put(SKILLS_TEXT_COL_NAME, skills[counter]);
+ assertEquals(skills.size(), NUM_SKILLS);
+
+ File avroFile = new File(_tempDir, "data.avro");
+ org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ avroSchema.setFields(Arrays.asList(new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null),
+ new org.apache.avro.Schema.Field(TIME_COLUMN_NAME,
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null)));
+ try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(TEXT_COLUMN_NAME, skills.get(i % NUM_SKILLS));
+ record.put(TIME_COLUMN_NAME, System.currentTimeMillis());
+ fileWriter.append(record);
}
- record.put(TIME_COL_NAME, TimeUtils.getValidMinTimeMillis());
- avroRecordWriter.append(record);
- counter++;
}
-
- avroRecordWriter.close();
- return outputAvroFile;
+ return avroFile;
}
-
- // we need to make this more deterministic. internal release builds
- // are failing intermittently. disable until we make it reasonably deterministic
- @Test(enabled=false)
- public void testTextSearchCountQuery() throws Exception {
- String pqlQuery =
- "SELECT count(*) FROM " + TABLE_NAME + " WHERE text_match(SKILLS_TEXT_COL, '\"machine learning\" AND spark') LIMIT 1000000";
- int prevResult = 0;
- // run the same query 2000 times and see an increasing number of hits in the index and count(*) result
- for (int i = 0; i < 2000; i++) {
- JsonNode pinotResponse = postQuery(pqlQuery);
- Assert.assertTrue(pinotResponse.has("aggregationResults"));
- TextNode textNode = (TextNode)pinotResponse.get("aggregationResults").get(0).get("value");
- int result = Integer.valueOf(textNode.textValue());
- // TODO: see if this can be made more deterministic
- if (i >= 300) {
- Assert.assertTrue(result > 0);
- Assert.assertTrue(result >= prevResult);
- }
- prevResult = result;
- Thread.sleep(10);
+ @Test
+ public void testTextSearchCountQuery()
+ throws Exception {
+ // Keep posting queries until all records are consumed
+ long previousResult = 0;
+ while (getCurrentCountStarResult() < NUM_RECORDS) {
+ long result = getTextColumnQueryResult();
+ assertTrue(result >= previousResult);
+ previousResult = result;
+ Thread.sleep(100);
}
+
+ // TODO: Fix Lucene index on consuming segments to update the latest records, then uncomment the following part
+// TestUtils.waitForCondition(aVoid -> {
+// try {
+// return getTextColumnQueryResult() == NUM_MATCHING_RECORDS;
+// } catch (Exception e) {
+// fail("Caught exception while getting text column query result");
+// return false;
+// }
+// }, 10_000L, "Failed to reach expected number of matching records");
+ }
+
+ private long getTextColumnQueryResult()
+ throws Exception {
+ return postQuery(TEST_TEXT_COLUMN_QUERY).get("aggregationResults").get(0).get("value").asLong();
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index d5fcb5a..8953ea1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -21,24 +21,23 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -71,37 +70,28 @@ public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest {
startBroker();
startServer();
- // Create the tables
- addOfflineTable(getTableName());
-
- // Create and upload segments
- File avroFile = createAvroFile();
- Schema schema = new Schema.SchemaBuilder().setSchemaName(getTableName())
+ // Create and upload the schema and table config
+ String rawTableName = getTableName();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(rawTableName)
.addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
.addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
.addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
.addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
- FieldSpec intKeyMapJsonStrFieldSpec = new DimensionFieldSpec();
- intKeyMapJsonStrFieldSpec.setDataType(DataType.STRING);
- intKeyMapJsonStrFieldSpec.setDefaultNullValue("");
- intKeyMapJsonStrFieldSpec.setName(INT_KEY_MAP_STR_FIELD_NAME);
- intKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")");
- intKeyMapJsonStrFieldSpec.setSingleValueField(true);
- schema.addField(intKeyMapJsonStrFieldSpec);
- FieldSpec stringKeyMapJsonStrFieldSpec = new DimensionFieldSpec();
- stringKeyMapJsonStrFieldSpec.setDataType(DataType.STRING);
- stringKeyMapJsonStrFieldSpec.setDefaultNullValue("");
- stringKeyMapJsonStrFieldSpec.setName(STRING_KEY_MAP_STR_FIELD_NAME );
+ FieldSpec stringKeyMapJsonStrFieldSpec =
+ new DimensionFieldSpec(STRING_KEY_MAP_STR_FIELD_NAME, DataType.STRING, true);
stringKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")");
- stringKeyMapJsonStrFieldSpec.setSingleValueField(true);
schema.addField(stringKeyMapJsonStrFieldSpec);
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(Collections.singletonList(avroFile), 0, _segmentDir, _tarDir, getTableName(), null, null,
- schema, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- uploadSegments(getTableName(), _tarDir);
+ FieldSpec intKeyMapJsonStrFieldSpec = new DimensionFieldSpec(INT_KEY_MAP_STR_FIELD_NAME, DataType.STRING, true);
+ intKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")");
+ schema.addField(intKeyMapJsonStrFieldSpec);
+ addSchema(schema);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).build();
+ addTableConfig(tableConfig);
+
+ // Create and upload segments
+ File avroFile = createAvroFile();
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(rawTableName, _tarDir);
// Wait for all documents loaded
waitForAllDocsLoaded(60_000);
@@ -148,12 +138,12 @@ public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest {
JsonNode selectionResults = pinotResponse.get("selectionResults").get("results");
assertEquals(selectionResults.size(), 10);
for (int i = 0; i < 10; i++) {
- assertEquals(selectionResults.get(i).get(0).textValue(), String.format("{\"k1\":%d,\"k2\":100%d}",i,i));
+ assertEquals(selectionResults.get(i).get(0).textValue(), String.format("{\"k1\":%d,\"k2\":100%d}", i, i));
}
query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') FROM " + getTableName();
- pinotResponse = postQuery(query);
+ pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
- selectionResults = pinotResponse.get("selectionResults").get("results");
+ selectionResults = pinotResponse.get("selectionResults").get("results");
assertEquals(selectionResults.size(), 10);
for (int i = 0; i < 10; i++) {
assertEquals(Integer.parseInt(selectionResults.get(i).get(0).textValue()), i);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
index a603c89..f3538d5 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
@@ -23,11 +23,8 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -39,112 +36,48 @@ import static org.testng.Assert.assertTrue;
/**
- * /**
- * Integration test to check aggregation functions which use DictionaryBasedAggregationPlan and MetadataBasedAggregationPlan
- *
- * <ul>
- * <li>
- * Set up the Pinot cluster and create two tables, one with default indexes, one with star tree indexes
- * </li>
- * <li>
- * Send queries to both the tables and check results
- * </li>
- * </ul>
+ * Integration test to check aggregation functions which use {@code DictionaryBasedAggregationPlanNode} and
+ * {@code MetadataBasedAggregationPlanNode}.
*/
// TODO: remove this integration test and add unit test for metadata and dictionary based aggregation operator
public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends BaseClusterIntegrationTest {
- private static final int NUM_BROKERS = 1;
- private static final int NUM_SERVERS = 1;
-
- protected int getNumBrokers() {
- return NUM_BROKERS;
- }
-
- protected int getNumServers() {
- return NUM_SERVERS;
- }
-
- private static final String SCHEMA_FILE_NAME =
- "On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
-
- @Override
- protected String getSchemaFileName() {
- return SCHEMA_FILE_NAME;
- }
-
- private static final String DEFAULT_TABLE_NAME = "myTable";
- private static final String STAR_TREE_TABLE_NAME = "myStarTable";
-
- private String _currentTable;
-
- @Override
- protected String getTableName() {
- return _currentTable;
- }
@BeforeClass
public void setUp()
throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
startController();
- startBrokers(getNumBrokers());
- startServers(getNumServers());
+ startBroker();
+ startServer();
- // Create the tables
- addOfflineTable(DEFAULT_TABLE_NAME);
- addOfflineTable(STAR_TREE_TABLE_NAME);
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- // Create and upload segments without star tree indexes from Avro data
- createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false, getRawIndexColumns(), null);
-
- // Create and upload segments with star tree indexes from Avro data
- createAndUploadSegments(avroFiles, STAR_TREE_TABLE_NAME, true, null, Schema.fromFile(getSchemaFile()));
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
- // Load data into H2
- _currentTable = DEFAULT_TABLE_NAME;
- loadDataIntoH2(avroFiles);
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
- _currentTable = STAR_TREE_TABLE_NAME;
- waitForAllDocsLoaded(600_000L);
- }
-
- private void loadDataIntoH2(List<File> avroFiles)
- throws Exception {
- ExecutorService executor = Executors.newCachedThreadPool();
- setUpH2Connection(avroFiles, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- }
-
- private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex,
- List<String> rawIndexColumns, Schema pinotSchema)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, rawIndexColumns, pinotSchema,
- executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- uploadSegments(getTableName(), _tarDir);
}
@Test
public void testDictionaryBasedQueries()
throws Exception {
-
+ String tableName = getTableName();
String pqlQuery;
- String pqlStarTreeQuery;
String sqlQuery;
String sqlQuery1;
String sqlQuery2;
@@ -153,223 +86,148 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
// Test queries with min, max, minmaxrange
// Dictionary columns
// int
- pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrTime) FROM " + tableName;
+ sqlQuery = "SELECT MIN(ArrTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrTime) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(ArrTime) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrTime) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// float
- pqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery =
- "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery =
- "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// double
- pqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery =
- "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery =
- "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// long
- pqlQuery = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(AirlineID) FROM " + tableName;
+ sqlQuery = "SELECT MAX(AirlineID) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(AirlineID) FROM " + tableName;
+ sqlQuery = "SELECT MIN(AirlineID) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + tableName;
+ sqlQuery = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(AirlineID) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(AirlineID) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(AirlineID) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// string
// TODO: add test cases for string column when we add support for min and max on string datatype columns
// Non dictionary columns
// int
- pqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+ sqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM "
- + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM "
- + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery =
+ "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// float
- pqlQuery = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrDelay) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelay) FROM " + tableName;
+ sqlQuery = "SELECT MIN(ArrDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + tableName;
+ sqlQuery = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(ArrDelay) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// double
- pqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
+ sqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelay) FROM " + tableName;
+ sqlQuery = "SELECT MIN(DepDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + tableName;
+ sqlQuery = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
- pqlQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery3 = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(DepDelay) FROM " + tableName;
+ sqlQuery2 = "SELECT MAX(DepDelay) FROM " + tableName;
+ sqlQuery3 = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
- pqlQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery1 = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
- sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + tableName;
+ sqlQuery1 = "SELECT MIN(DepDelay) FROM " + tableName;
+ sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
- testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
// string
// TODO: add test cases for string column when we add support for min and max on string datatype columns
@@ -378,42 +236,42 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
JsonNode response;
// Dictionary column: answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// Non dictionary column: not answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong());
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// multiple dictionary based aggregation functions, dictionary columns: answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + tableName;
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// multiple aggregation functions, mix of dictionary based and non dictionary based: not answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + tableName;
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong());
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// group by in query : not answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME + " group by DaysSinceEpoch";
+ pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " group by DaysSinceEpoch";
response = postQuery(pqlQuery);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// filter in query: not answered by DictionaryBasedAggregationOperator
- pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME + " where DaysSinceEpoch > 16100";
+ pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " where DaysSinceEpoch > 16100";
response = postQuery(pqlQuery);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
@@ -422,73 +280,54 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
@Test
public void testMetadataBasedQueries()
throws Exception {
-
+ String tableName = getTableName();
String pqlQuery;
- String pqlStarTreeQuery;
String sqlQuery;
// Test queries with count *
- pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT COUNT(*) FROM " + tableName;
+ sqlQuery = "SELECT COUNT(*) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
// Test queries with max on time column
- pqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + DEFAULT_TABLE_NAME;
- pqlStarTreeQuery = "SELECT MAX(DaysSinceEpoch) FROM " + STAR_TREE_TABLE_NAME;
- sqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + tableName;
+ sqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + tableName;
testQuery(pqlQuery, Collections.singletonList(sqlQuery));
- testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
// Check execution stats
JsonNode response;
- pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT COUNT(*) FROM " + tableName;
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
- pqlStarTreeQuery = "SELECT COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
- response = postQuery(pqlStarTreeQuery);
- assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
- assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
- assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
-
// group by present in query: not answered by MetadataBasedAggregationOperator
- pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME + " GROUP BY DaysSinceEpoch";
+ pqlQuery = "SELECT COUNT(*) FROM " + tableName + " GROUP BY DaysSinceEpoch";
response = postQuery(pqlQuery);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
// filter present in query: not answered by MetadataBasedAggregationOperator
- pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME + " WHERE DaysSinceEpoch > 16100";
+ pqlQuery = "SELECT COUNT(*) FROM " + tableName + " WHERE DaysSinceEpoch > 16100";
response = postQuery(pqlQuery);
assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
// mixed aggregation functions in query: not answered by MetadataBasedAggregationOperator
- pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+ pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + tableName;
response = postQuery(pqlQuery);
assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
-
- // mixed aggregation functions in star tree query: not answered by MetadataBasedAggregationOperator
- pqlStarTreeQuery = "SELECT COUNT(*),MAX(DaysSinceEpoch) FROM " + STAR_TREE_TABLE_NAME;
- response = postQuery(pqlStarTreeQuery);
- assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
- assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
- assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
}
@AfterClass
public void tearDown()
throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
- dropOfflineTable(STAR_TREE_TABLE_NAME);
+ dropOfflineTable(getTableName());
stopServer();
stopBroker();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 14b5bc4..f433a3e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,14 +24,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.QueryException;
@@ -42,6 +38,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -51,7 +48,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
/**
@@ -72,18 +72,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
- // For inverted index triggering test
+ // For range index triggering test
private static final List<String> UPDATED_RANGE_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
private static final String TEST_UPDATED_RANGE_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime > 305";
+ // For bloom filter triggering test
private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
// For default columns test
- private static final String SCHEMA_WITH_EXTRA_COLUMNS =
+ private static final String SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema";
- private static final String SCHEMA_WITH_MISSING_COLUMNS =
+ private static final String SCHEMA_FILE_NAME_WITH_MISSING_COLUMNS =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_missing_columns.schema";
private static final String TEST_DEFAULT_COLUMNS_QUERY =
"SELECT COUNT(*) FROM mytable WHERE NewAddedIntDimension < 0";
@@ -100,6 +101,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
return NUM_SERVERS;
}
+ private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
+ @Override
+ protected String getSchemaFileName() {
+ return _schemaFileName;
+ }
+
+ // NOTE: Only allow removing default columns for v1 segment
+ @Override
+ protected String getSegmentVersion() {
+ return SegmentVersion.v1.name();
+ }
+
@BeforeClass
public void setUp()
throws Exception {
@@ -111,31 +125,24 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
startBrokers(getNumBrokers());
startServers(getNumServers());
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- ExecutorService executor = Executors.newCachedThreadPool();
-
- // Create segments from Avro data
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, getTableName(), null, getRawIndexColumns(), null,
- executor);
-
- // Load data into H2
- setUpH2Connection(avroFiles, executor);
-
- // Initialize query generator
- setUpQueryGenerator(avroFiles, executor);
-
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
- // Create the table
- addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
- getBloomFilterIndexColumns(), getRangeIndexColumns(), getTaskConfig(), null, null);
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
- // Upload all segments
- uploadSegments(getTableName(), _tarDir);
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
// Set up service status callbacks
// NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
@@ -199,12 +206,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
@Test
public void testRefreshTableConfigAndQueryTimeout()
throws Exception {
- TableConfig tableConfig = _helixResourceManager.getOfflineTableConfig(getTableName());
- assertNotNull(tableConfig);
-
// Set timeout as 5ms so that query will timeout
+ TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(5L));
- _helixResourceManager.updateTableConfig(tableConfig);
+ updateTableConfig(tableConfig);
// Wait for at most 1 minute for broker to receive and process the table config refresh message
TestUtils.waitForCondition(aVoid -> {
@@ -228,7 +233,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Remove timeout so that query will finish
tableConfig.setQueryConfig(null);
- _helixResourceManager.updateTableConfig(tableConfig);
+ updateTableConfig(tableConfig);
// Wait for at most 1 minute for broker to receive and process the table config refresh message
TestUtils.waitForCondition(aVoid -> {
@@ -280,16 +285,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
@Test
public void testInvertedIndexTriggering()
throws Exception {
- final long numTotalDocs = getCountStarResult();
+ long numTotalDocs = getCountStarResult();
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
// Update table config and trigger reload
- updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
- UPDATED_INVERTED_INDEX_COLUMNS, null, null, getTaskConfig(), null, null);
-
- sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
+ updateTableConfig(tableConfig);
+ reloadOfflineTable(getTableName());
TestUtils.waitForCondition(aVoid -> {
try {
@@ -304,58 +309,55 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testBloomFilterTriggering()
+ public void testRangeIndexTriggering()
throws Exception {
- final long numTotalDocs = getCountStarResult();
- JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
- assertEquals(queryResponse.get("numSegmentsProcessed").asLong(), NUM_SEGMENTS);
+ long numTotalDocs = getCountStarResult();
- // Update table config and trigger reload
- updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
- UPDATED_BLOOM_FILTER_COLUMNS, null, getTaskConfig(), null, null);
+ JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+ assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
- sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+ // Update table config and trigger reload
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
+ updateTableConfig(tableConfig);
+ reloadOfflineTable(getTableName());
TestUtils.waitForCondition(aVoid -> {
try {
- JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
// Total docs should not change during reload
assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
- return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
+ return queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 600_000L, "Failed to generate bloom index");
+ }, 600_000L, "Failed to generate range index");
}
@Test
- public void testRangeIndexTriggering()
+ public void testBloomFilterTriggering()
throws Exception {
- final long numTotalDocs = getCountStarResult();
- JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
- System.out.println("Before queryResponse = " + queryResponse);
- assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
- long beforeCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+ long numTotalDocs = getCountStarResult();
- // Update table config and trigger reload
- updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null, null,
- UPDATED_RANGE_INDEX_COLUMNS, getTaskConfig(), null, null);
+ JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ assertEquals(queryResponse.get("numSegmentsProcessed").asLong(), NUM_SEGMENTS);
- sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+ // Update table config and trigger reload
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS);
+ updateTableConfig(tableConfig);
+ reloadOfflineTable(getTableName());
TestUtils.waitForCondition(aVoid -> {
try {
- JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
- System.out.println("After queryResponse = " + queryResponse);
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ // Total docs should not change during reload
assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
- long afterCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
- //we should be scanning less than numTotalDocs with index enabled.
- //In the current implementation its 8785, but it
- return beforeCount == afterCount && queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
+ return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 600_000L, "Failed to generate inverted index");
+ }, 600_000L, "Failed to generate bloom filter");
}
/**
@@ -392,18 +394,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(queryResponse.get("selectionResults").get("columns").size(), 79);
}
- private void reloadDefaultColumns(final boolean withExtraColumns)
+ private void reloadDefaultColumns(boolean withExtraColumns)
throws Exception {
- final long numTotalDocs = getCountStarResult();
+ long numTotalDocs = getCountStarResult();
if (withExtraColumns) {
- sendSchema(SCHEMA_WITH_EXTRA_COLUMNS);
+ _schemaFileName = SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS;
+ addSchema(createSchema());
} else {
- sendSchema(SCHEMA_WITH_MISSING_COLUMNS);
+ _schemaFileName = SCHEMA_FILE_NAME_WITH_MISSING_COLUMNS;
+ addSchema(createSchema());
}
// Trigger reload
- sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+ reloadOfflineTable(getTableName());
String errorMessage;
if (withExtraColumns) {
@@ -429,14 +433,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}, 600_000L, errorMessage);
}
- private void sendSchema(String resourceName)
- throws Exception {
- URL resource = OfflineClusterIntegrationTest.class.getClassLoader().getResource(resourceName);
- assertNotNull(resource);
- File schemaFile = new File(resource.getFile());
- addSchema(schemaFile, getTableName());
- }
-
private void testNewAddedColumns()
throws Exception {
long numTotalDocs = getCountStarResult();
@@ -926,24 +922,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testCaseInsensitivity()
- throws Exception {
- addSchema(getSchemaFile(), getTableName());
- List<String> queries = new ArrayList<>();
+ public void testCaseInsensitivity() {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
- queries.add("SELECT * FROM mytable");
- queries.add("SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable");
- queries.add(
- "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000");
- queries.add(
- "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000");
- queries.add("SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch);
- queries
- .add("SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch);
- queries.add("SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch);
- queries.add("SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable");
- queries.add(
+ List<String> queries = Arrays.asList("SELECT * FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+ "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+ "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
queries.replaceAll(query -> query.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
deleted file mode 100644
index 6e99f30..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.io.FileUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests the URI upload path through a local file uri.
- */
-public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet {
- private static final Logger LOGGER = LoggerFactory.getLogger(PinotURIUploadIntegrationTest.class);
- private String _tableName;
- private File _metadataDir = new File(_segmentDir, "tmpMeta");
-
- @Nonnull
- @Override
- protected String getTableName() {
- return _tableName;
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- FileUtils.deleteQuietly(_metadataDir);
- FileUtils.deleteQuietly(new File(_metadataDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
- // Start an empty Pinot cluster
- startZk();
- startController();
- startBroker();
- startServer();
- }
-
- @BeforeMethod
- public void setupMethod(Object[] args)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
- if (args == null || args.length == 0) {
- return;
- }
- _tableName = (String) args[0];
- SegmentVersion version = (SegmentVersion) args[1];
- addOfflineTable(_tableName, version);
- }
-
- @AfterMethod
- public void teardownMethod()
- throws Exception {
- if (_tableName != null) {
- dropOfflineTable(_tableName);
- }
- }
-
- private File generateRandomSegment(String segmentName, int rowCount)
- throws Exception {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Schema schema = new Schema.Parser()
- .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
- GenericRecord record = new GenericData.Record(schema);
- GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
- DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
- File avroFile = new File(_tempDir, segmentName + ".avro");
- fileWriter.create(schema, avroFile);
-
- for (int i = 0; i < rowCount; i++) {
- record.put(0, random.nextInt());
- fileWriter.append(record);
- }
-
- fileWriter.close();
-
- int segmentIndex = Integer.parseInt(segmentName.split("_")[1]);
-
- File segmentTarDir = new File(_tarDir, segmentName);
- TestUtils.ensureDirectoriesExistAndEmpty(segmentTarDir);
- ExecutorService executor = MoreExecutors.newDirectExecutorService();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(Collections.singletonList(avroFile), segmentIndex, new File(_segmentDir, segmentName),
- segmentTarDir, this._tableName, executor);
- executor.shutdown();
- executor.awaitTermination(1L, TimeUnit.MINUTES);
-
- FileUtils.forceDelete(avroFile);
- return new File(_tarDir, segmentName);
- }
-
- @DataProvider(name = "configProvider")
- public Object[][] configProvider() {
- Object[][] configs = {{"mytable", SegmentVersion.v1}, {"yourtable", SegmentVersion.v3}};
- return configs;
- }
-
- @Test(dataProvider = "configProvider")
- public void testRefresh(String tableName, SegmentVersion version)
- throws Exception {
- final String segment6 = "segmentToBeRefreshed_6";
- final int nRows1 = 69;
- File segmentTarDir = generateRandomSegment(segment6, nRows1);
- uploadSegmentsDirectly(segmentTarDir);
- verifyNRows(0, nRows1);
- FileUtils.forceDelete(segmentTarDir);
- }
-
- // Verify that the number of rows is either the initial value or the final value but not something else.
- private void verifyNRows(int currentNrows, int finalNrows)
- throws Exception {
- int attempt = 0;
- long sleepTime = 100;
- long nRows;
- while (attempt < 10) {
- Thread.sleep(sleepTime);
- try {
- nRows = getCurrentCountStarResult();
- } catch (Exception e) {
- nRows = -1;
- }
- //nRows can either be the current value or the final value, not any other.
- if (nRows == currentNrows || nRows == -1) {
- sleepTime *= 2;
- attempt++;
- } else if (nRows == finalNrows) {
- return;
- } else {
- Assert.fail("Found unexpected number of rows " + nRows);
- }
- }
- Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
- }
-
- @AfterClass
- public void tearDown() {
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- FileUtils.deleteQuietly(_tempDir);
- FileUtils.deleteQuietly(_metadataDir);
- FileUtils.deleteQuietly(new File(_metadataDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
- }
-
- /**
- * Upload all segments inside the given directory to the cluster.
- *
- * @param segmentDir Segment directory
- */
- private void uploadSegmentsDirectly(@Nonnull File segmentDir)
- throws Exception {
- String[] segmentNames = segmentDir.list();
- Assert.assertNotNull(segmentNames);
-
- try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
- // Upload all segments in parallel
- int numSegments = segmentNames.length;
- ExecutorService executor = Executors.newFixedThreadPool(numSegments);
- List<Future<Integer>> tasks = new ArrayList<>(numSegments);
- for (final String segmentName : segmentNames) {
- File segmentFile = new File(segmentDir, segmentName);
- String downloadUri = segmentFile.toURI().toString();
- final List<Header> httpHeaders = null;
-
- tasks.add(executor.submit(new Callable<Integer>() {
- @Override
- public Integer call()
- throws Exception {
- List<NameValuePair> parameters = Collections.singletonList(
- new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, getTableName()));
-
- return fileUploadDownloadClient
- .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort),
- downloadUri, httpHeaders, parameters, 60 * 1000).getStatusCode();
- }
- }));
- }
- for (Future<Integer> task : tasks) {
- Assert.assertEquals((int) task.get(), HttpStatus.SC_OK);
- }
- Assert.assertTrue(getAllSegments(getTableName()).size() == 1);
-
- executor.shutdown();
- }
- }
-
- private List<String> getAllSegments(String tableName)
- throws IOException {
- List<String> allSegments = new ArrayList<>();
- HttpHost controllerHttpHost = new HttpHost("localhost", _controllerPort);
- HttpClient controllerClient = new DefaultHttpClient();
- HttpGet req = new HttpGet("/segments/" + tableName);
- HttpResponse res = controllerClient.execute(controllerHttpHost, req);
- try {
- if (res.getStatusLine().getStatusCode() != 200) {
- throw new IllegalStateException(res.getStatusLine().toString());
- }
- InputStream content = res.getEntity().getContent();
- JsonNode segmentsData = JsonUtils.inputStreamToJsonNode(content);
-
- if (segmentsData != null) {
- JsonNode offlineSegments = segmentsData.get(0).get("OFFLINE");
- if (offlineSegments != null) {
- for (JsonNode segment : offlineSegments) {
- allSegments.add(segment.asText());
- }
- }
- }
- LOGGER.info("All segments : {}", allSegments);
- } finally {
- if (res.getEntity() != null) {
- EntityUtils.consume(res.getEntity());
- }
- }
- return allSegments;
- }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6db9137..6e7636a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -21,9 +21,6 @@ package org.apache.pinot.integration.tests;
import java.io.File;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -53,22 +50,18 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- ExecutorService executor = Executors.newCachedThreadPool();
+ // Create and upload the schema and table config
+ addSchema(createSchema());
+ addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
- // Push data into the Kafka topic
- pushAvroIntoKafka(avroFiles, getKafkaTopic(), executor);
+ // Push data into Kafka
+ pushAvroIntoKafka(avroFiles);
- // Load data into H2
- setUpH2Connection(avroFiles, executor);
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
- // Initialize query generator
- setUpQueryGenerator(avroFiles, executor);
-
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- // Create Pinot table
- setUpRealtimeTable(avroFiles.get(0));
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index 0dc4560..a03ca98 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -60,6 +60,11 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest
private String _currentSegment;
@Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Override
protected int getNumKafkaPartitions() {
return NUM_KAFKA_PARTITIONS;
}
@@ -76,13 +81,9 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest
// Start Kafka
startKafka();
- // Create Pinot table
- setUpRealtimeTable(null);
- }
-
- @Override
- protected boolean useLlc() {
- return true;
+ // Create and upload the schema and table config
+ addSchema(createSchema());
+ addTableConfig(createRealtimeTableConfig(null));
}
/**
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 3416164..a0be8ed 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -31,7 +31,6 @@ import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.events.MinionEventObserver;
import org.apache.pinot.minion.events.MinionEventObserverFactory;
@@ -42,6 +41,7 @@ import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -84,11 +84,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Add 3 offline tables, where 2 of them have TestTask enabled
TableTaskConfig taskConfig =
new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
- addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
- null);
- addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
- null);
- addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null, null);
+ addTableConfig(
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build());
+ addTableConfig(
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setTaskConfig(taskConfig).build());
+ addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_3).build());
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index ccb2e17..7b80f71 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -23,17 +23,12 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
-import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.tools.query.comparison.QueryComparison;
import org.apache.pinot.tools.query.comparison.SegmentInfoProvider;
@@ -73,32 +68,30 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE);
private static final int NUM_QUERIES_TO_GENERATE = 100;
- private List<String> _starTree1Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
- private List<String> _starTree2Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
- private List<String> _starTree1Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
- private List<String> _starTree2Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+ private String _currentTable;
private StarTreeQueryGenerator _starTree1QueryGenerator;
private StarTreeQueryGenerator _starTree2QueryGenerator;
- private Schema _schema;
- private String _currentTable;
-
- @Nonnull
@Override
protected String getTableName() {
return _currentTable;
}
- @Nonnull
@Override
protected String getSchemaFileName() {
return SCHEMA_FILE_NAME;
}
+ // NOTE: Star-Tree and SegmentInfoProvider does not work on no-dictionary dimensions
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay");
+ }
+
@BeforeClass
public void setUp()
throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
@@ -106,87 +99,73 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
startBroker();
startServers(2);
- // Create the tables
- addOfflineTable(DEFAULT_TABLE_NAME);
- addOfflineTable(STAR_TREE_TABLE_NAME);
-
- // Set up segments and query generator
- _schema = Schema.fromFile(getSchemaFile());
- setUpSegmentsAndQueryGenerator();
-
- // Wait for all documents loaded
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
_currentTable = DEFAULT_TABLE_NAME;
- waitForAllDocsLoaded(600_000L);
- _currentTable = STAR_TREE_TABLE_NAME;
- waitForAllDocsLoaded(600_000L);
- }
+ TableConfig defaultTableConfig = createOfflineTableConfig();
+ addTableConfig(defaultTableConfig);
- private void setUpSegmentsAndQueryGenerator()
- throws Exception {
- // Randomly pick some dimensions and metrics for star-tree V2
- List<String> allDimensions = new ArrayList<>(_schema.getDimensionNames());
+ // Randomly pick some dimensions and metrics for star-trees
+ List<String> starTree1Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
+ List<String> starTree2Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
+ List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
Collections.shuffle(allDimensions);
for (int i = 0; i < NUM_STAR_TREE_DIMENSIONS; i++) {
- _starTree1Dimensions.add(allDimensions.get(2 * i));
- _starTree2Dimensions.add(allDimensions.get(2 * i + 1));
+ starTree1Dimensions.add(allDimensions.get(2 * i));
+ starTree2Dimensions.add(allDimensions.get(2 * i + 1));
}
- List<String> allMetrics = new ArrayList<>(_schema.getMetricNames());
+ List<String> starTree1Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+ List<String> starTree2Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+ List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
Collections.shuffle(allMetrics);
for (int i = 0; i < NUM_STAR_TREE_METRICS; i++) {
- _starTree1Metrics.add(allMetrics.get(2 * i));
- _starTree2Metrics.add(allMetrics.get(2 * i + 1));
+ starTree1Metrics.add(allMetrics.get(2 * i));
+ starTree2Metrics.add(allMetrics.get(2 * i + 1));
}
+ _currentTable = STAR_TREE_TABLE_NAME;
+ TableConfig starTreeTableConfig = createOfflineTableConfig();
+ starTreeTableConfig.getIndexingConfig().setStarTreeIndexConfigs(Arrays
+ .asList(getStarTreeIndexConfig(starTree1Dimensions, starTree1Metrics),
+ getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics)));
+ addTableConfig(starTreeTableConfig);
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- // Create and upload segments without star tree indexes from Avro data
- createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, defaultTableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, starTreeTableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(STAR_TREE_TABLE_NAME, _tarDir);
- // Initialize the query generator using segments without star tree indexes
- SegmentInfoProvider segmentInfoProvider = new SegmentInfoProvider(_tarDir.getAbsolutePath());
+ // Set up the query generators
+ SegmentInfoProvider segmentInfoProvider = new SegmentInfoProvider(_tarDir.getPath());
List<String> aggregationFunctions = new ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
aggregationFunctions.add(functionType.getName());
}
- _starTree1QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, _starTree1Dimensions, _starTree1Metrics,
+ _starTree1QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree1Dimensions, starTree1Metrics,
segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
- _starTree2QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, _starTree2Dimensions, _starTree2Metrics,
+ _starTree2QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree2Dimensions, starTree2Metrics,
segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
- // Create and upload segments with star tree indexes from Avro data
- createAndUploadSegments(avroFiles, STAR_TREE_TABLE_NAME, true);
- }
-
- private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
- List<StarTreeV2BuilderConfig> starTreeV2BuilderConfigs = null;
- if (createStarTreeIndex) {
- starTreeV2BuilderConfigs = Arrays.asList(getBuilderConfig(_starTree1Dimensions, _starTree1Metrics),
- getBuilderConfig(_starTree2Dimensions, _starTree2Metrics));
- }
-
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, starTreeV2BuilderConfigs, null, _schema,
- executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- uploadSegments(getTableName(), _tarDir);
+ // Wait for all documents loaded
+ _currentTable = DEFAULT_TABLE_NAME;
+ waitForAllDocsLoaded(600_000L);
+ _currentTable = STAR_TREE_TABLE_NAME;
+ waitForAllDocsLoaded(600_000L);
}
- private static StarTreeV2BuilderConfig getBuilderConfig(List<String> dimensions, List<String> metrics) {
- Set<AggregationFunctionColumnPair> functionColumnPairs = new HashSet<>();
+ private static StarTreeIndexConfig getStarTreeIndexConfig(List<String> dimensions, List<String> metrics) {
+ List<String> functionColumnPairs = new ArrayList<>();
for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
for (String metric : metrics) {
- functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric));
+ functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric).toColumnName());
}
}
- return new StarTreeV2BuilderConfig.Builder().setDimensionsSplitOrder(dimensions)
- .setFunctionColumnPairs(functionColumnPairs).setMaxLeafRecords(10).build();
+ return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, 0);
}
@Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java
deleted file mode 100644
index fabceae..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.File;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Test that uploads, refreshes and deletes segments from multiple threads and checks that the row count in Pinot
- * matches with the expected count.
- *
- * @author jfim
- */
-// TODO: clean up this test
-public class UploadRefreshDeleteIntegrationTest extends BaseClusterIntegrationTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(UploadRefreshDeleteIntegrationTest.class);
- private String _tableName;
-
- @Nonnull
- @Override
- protected String getTableName() {
- return _tableName;
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- // Start an empty Pinot cluster
- startZk();
- startController();
- startBroker();
- startServer();
- }
-
- @BeforeMethod
- public void setupMethod(Object[] args)
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
- if (args == null || args.length == 0) {
- return;
- }
- _tableName = (String) args[0];
- SegmentVersion version = (SegmentVersion) args[1];
- addOfflineTable(_tableName, version);
- }
-
- @AfterMethod
- public void teardownMethod()
- throws Exception {
- if (_tableName != null) {
- dropOfflineTable(_tableName);
- }
- }
-
- protected void generateAndUploadRandomSegment(String segmentName, int rowCount)
- throws Exception {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Schema schema = new Schema.Parser()
- .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
- GenericRecord record = new GenericData.Record(schema);
- GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
- DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
- File avroFile = new File(_tempDir, segmentName + ".avro");
- fileWriter.create(schema, avroFile);
-
- for (int i = 0; i < rowCount; i++) {
- record.put(0, random.nextInt());
- fileWriter.append(record);
- }
-
- fileWriter.close();
-
- int segmentIndex = Integer.parseInt(segmentName.split("_")[1]);
-
- File segmentTarDir = new File(_tarDir, segmentName);
- TestUtils.ensureDirectoriesExistAndEmpty(segmentTarDir);
- ExecutorService executor = MoreExecutors.newDirectExecutorService();
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(Collections.singletonList(avroFile), segmentIndex, new File(_segmentDir, segmentName),
- segmentTarDir, this._tableName, executor);
- executor.shutdown();
- executor.awaitTermination(1L, TimeUnit.MINUTES);
-
- uploadSegments(getTableName(), segmentTarDir);
-
- FileUtils.forceDelete(avroFile);
- FileUtils.forceDelete(segmentTarDir);
- }
-
- @DataProvider(name = "configProvider")
- public Object[][] configProvider() {
- Object[][] configs = {{"mytable", SegmentVersion.v1}, {"yourtable", SegmentVersion.v3}};
- return configs;
- }
-
- @Test(dataProvider = "configProvider")
- public void testRefresh(String tableName, SegmentVersion version)
- throws Exception {
- final int nAtttempts = 5;
- final String segment6 = "segmentToBeRefreshed_6";
- final int nRows1 = 69;
- generateAndUploadRandomSegment(segment6, nRows1);
- verifyNRows(0, nRows1);
- final int nRows2 = 198;
- LOGGER.info("Segment {} loaded with {} rows, refreshing with {}", segment6, nRows1, nRows2);
- generateAndUploadRandomSegment(segment6, nRows2);
- verifyNRows(nRows1, nRows2);
- // Load another segment while keeping this one in place.
- final String segment9 = "newSegment_9";
- final int nRows3 = 102;
- generateAndUploadRandomSegment(segment9, nRows3);
- verifyNRows(nRows2, nRows2 + nRows3);
- }
-
- // Verify that the number of rows is either the initial value or the final value but not something else.
- private void verifyNRows(int currentNrows, int finalNrows)
- throws Exception {
- int attempt = 0;
- long sleepTime = 100;
- long nRows;
- while (attempt < 10) {
- Thread.sleep(sleepTime);
- try {
- nRows = getCurrentCountStarResult();
- } catch (Exception e) {
- nRows = -1;
- }
- //nRows can either be the current value or the final value, not any other.
- if (nRows == currentNrows || nRows == -1) {
- sleepTime *= 2;
- attempt++;
- } else if (nRows == finalNrows) {
- return;
- } else {
- Assert.fail("Found unexpected number of rows " + nRows);
- }
- }
- Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
- }
-
- @Test(enabled = false, dataProvider = "configProvider")
- public void testUploadRefreshDelete(String tableName, SegmentVersion version)
- throws Exception {
- final int THREAD_COUNT = 1;
- final int SEGMENT_COUNT = 5;
-
- final int MIN_ROWS_PER_SEGMENT = 500;
- final int MAX_ROWS_PER_SEGMENT = 1000;
-
- final int OPERATIONS_PER_ITERATION = 10;
- final int ITERATION_COUNT = 5;
-
- final double UPLOAD_PROBABILITY = 0.8d;
-
- final String[] segmentNames = new String[SEGMENT_COUNT];
- final int[] segmentRowCounts = new int[SEGMENT_COUNT];
-
- for (int i = 0; i < SEGMENT_COUNT; i++) {
- segmentNames[i] = "segment_" + i;
- segmentRowCounts[i] = 0;
- }
-
- for (int i = 0; i < ITERATION_COUNT; i++) {
- // Create THREAD_COUNT threads
- ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
-
- // Submit OPERATIONS_PER_ITERATION uploads/deletes
- for (int j = 0; j < OPERATIONS_PER_ITERATION; j++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- ThreadLocalRandom random = ThreadLocalRandom.current();
-
- // Pick a random segment
- int segmentIndex = random.nextInt(SEGMENT_COUNT);
- String segmentName = segmentNames[segmentIndex];
-
- // Pick a random operation
- if (random.nextDouble() < UPLOAD_PROBABILITY) {
- // Upload this segment
- LOGGER.info("Will upload segment {}", segmentName);
-
- synchronized (segmentName) {
- // Create a segment with a random number of rows
- int segmentRowCount = random.nextInt(MIN_ROWS_PER_SEGMENT, MAX_ROWS_PER_SEGMENT);
- LOGGER.info("Generating and uploading segment {} with {} rows", segmentName, segmentRowCount);
- generateAndUploadRandomSegment(segmentName, segmentRowCount);
-
- // Store the number of rows
- LOGGER.info("Uploaded segment {} with {} rows", segmentName, segmentRowCount);
- segmentRowCounts[segmentIndex] = segmentRowCount;
- }
- } else {
- // Delete this segment
- LOGGER.info("Will delete segment {}", segmentName);
-
- synchronized (segmentName) {
- // Delete this segment
- LOGGER.info("Deleting segment {}", segmentName);
- String reply = sendDeleteRequest(_controllerRequestURLBuilder.
- forSegmentDelete("myresource", segmentName));
- LOGGER.info("Deletion returned {}", reply);
-
- // Set the number of rows to zero
- LOGGER.info("Deleted segment {}", segmentName);
- segmentRowCounts[segmentIndex] = 0;
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- // Await for all tasks to complete
- executorService.shutdown();
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
-
- // Count number of expected rows
- int expectedRowCount = 0;
- for (int segmentRowCount : segmentRowCounts) {
- expectedRowCount += segmentRowCount;
- }
-
- // Wait for up to one minute for the row count to match the expected row count
- LOGGER.info("Awaiting for the row count to match {}", expectedRowCount);
- int pinotRowCount = (int) getCurrentCountStarResult();
- long timeInOneMinute = System.currentTimeMillis() + 60 * 1000L;
- while (System.currentTimeMillis() < timeInOneMinute && pinotRowCount != expectedRowCount) {
- LOGGER.info("Row count is {}, expected {}, awaiting for row count to match", pinotRowCount, expectedRowCount);
- Thread.sleep(5000L);
-
- try {
- pinotRowCount = (int) getCurrentCountStarResult();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while sending query to Pinot, retrying", e);
- }
- }
-
- // Compare row counts
- Assert.assertEquals(pinotRowCount, expectedRowCount,
- "Expected and actual row counts don't match after waiting one minute");
- }
- }
-
- @AfterClass
- public void tearDown() {
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- FileUtils.deleteQuietly(_tempDir);
- }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java
deleted file mode 100644
index 2a3e88f..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-
-
-public class SegmentTestUtils {
- private SegmentTestUtils() {
- }
-
- @Nonnull
- public static SegmentGeneratorConfig getSegmentGeneratorConfig(@Nonnull File inputAvro, @Nonnull File outputDir,
- @Nonnull TimeUnit timeUnit, @Nonnull String tableName, @Nullable Schema pinotSchema)
- throws IOException {
- if (pinotSchema == null) {
- pinotSchema = AvroUtils.getPinotSchemaFromAvroDataFile(inputAvro);
- }
- TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName);
- // TODO: change to getDateTimeFieldSpecs().get(0)
- if (pinotSchema.getTimeFieldSpec() != null) {
- tableConfigBuilder.setTimeColumnName(pinotSchema.getTimeFieldSpec().getName());
- }
- TableConfig tableConfig = tableConfigBuilder.build();
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, pinotSchema);
-
- segmentGeneratorConfig.setInputFilePath(inputAvro.getAbsolutePath());
- segmentGeneratorConfig.setSegmentTimeUnit(timeUnit);
- if (inputAvro.getName().endsWith("gz")) {
- segmentGeneratorConfig.setFormat(FileFormat.GZIPPED_AVRO);
- } else {
- segmentGeneratorConfig.setFormat(FileFormat.AVRO);
- }
- segmentGeneratorConfig.setSegmentVersion(SegmentVersion.v1);
- segmentGeneratorConfig.setTableName(tableName);
- segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
-
- return segmentGeneratorConfig;
- }
-}
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74ffc21..46fa2d9 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -94,7 +94,6 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
@@ -174,10 +173,6 @@
<name>pinot-RawIndexBenchmark</name>
</program>
<program>
- <mainClass>org.apache.pinot.perf.RealtimeStressTest</mainClass>
- <name>pinot-RealtimeStressTest</name>
- </program>
- <program>
<mainClass>org.apache.pinot.perf.StringDictionaryPerfTest</mainClass>
<name>pinot-StringDictionaryPerfTest</name>
</program>
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 77e781f..e3196cd 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -19,16 +19,10 @@
package org.apache.pinot.perf;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
@@ -37,13 +31,9 @@ import org.apache.pinot.util.TestUtils;
* Benchmark that writes a configurable amount of rows in Kafka and checks how much time it takes to consume all of
* them.
*/
-public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegrationTest {
+public class BenchmarkRealtimeConsumptionSpeed extends BaseClusterIntegrationTest {
private static final int ROW_COUNT = 100_000;
- private static final int ROW_COUNT_FOR_SEGMENT_FLUSH = 10_000;
private static final long TIMEOUT_MILLIS = 20 * 60 * 1000L; // Twenty minutes
- private final File _tmpDir = new File("/tmp/" + getHelixClusterName());
- private static final int SEGMENT_COUNT = 1;
- private static final Random RANDOM = new Random(123456L);
public static void main(String[] args) {
try {
@@ -56,50 +46,34 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
private void runBenchmark()
throws Exception {
- // Start ZK and Kafka
- startZk();
- StreamDataServerStartable kafkaStarter = KafkaStarterUtils
- .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
- KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-
- // Create Kafka topic
- kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(10));
-
- // Unpack data (needed to get the Avro schema)
- TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
- RealtimeClusterIntegrationTest.class.getClassLoader()
- .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz"))), _tmpDir);
-
- _tmpDir.mkdirs();
- final List<File> avroFiles = new ArrayList<File>(SEGMENT_COUNT);
- for (int segmentNumber = 1; segmentNumber <= SEGMENT_COUNT; ++segmentNumber) {
- avroFiles.add(new File(_tmpDir.getPath() + "/On_Time_On_Time_Performance_2014_" + segmentNumber + ".avro"));
- }
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
// Start the Pinot cluster
+ startZk();
startController();
startBroker();
startServer();
- // Create realtime table
- setUpRealtimeTable(avroFiles.get(0));
+ // Start Kafka
+ startKafka();
- // Wait a couple of seconds for all Helix state transitions to happen
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+ // Unpack the Avro files
+ File avroFile = unpackAvroData(_tempDir).get(0);
+
+ // Create and upload the schema and table config
+ addSchema(createSchema());
+ addTableConfig(createRealtimeTableConfig(avroFile));
// Generate ROW_COUNT rows and write them into Kafka
- new Thread() {
- @Override
- public void run() {
- try {
- ClusterIntegrationTestUtils
- .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
- ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
- } catch (Exception e) {
- // Ignored
- }
+ new Thread(() -> {
+ try {
+ ClusterIntegrationTestUtils
+ .pushRandomAvroIntoKafka(avroFile, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
+ getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
+ } catch (Exception e) {
+ // Ignored
}
- }.start();
+ }).start();
// Count how many seconds it takes for select count(*) to match with ROW_COUNT
long startTime = System.currentTimeMillis();
@@ -127,5 +101,6 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
long endTime = System.currentTimeMillis();
System.out.println("Consumed " + ROW_COUNT + " rows in " + (endTime - startTime) / 1000.0 + " seconds");
+ FileUtils.deleteDirectory(_tempDir);
}
}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
deleted file mode 100644
index 7d239df..0000000
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.perf;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
-import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
-
-
-/**
- * Stress test that writes an infinite amount of data in Kafka for a given duration until Pinot breaks.
- */
-public class RealtimeStressTest extends RealtimeClusterIntegrationTest {
- private static final int ROW_COUNT = 100_000;
- private static final int MIN_ROW_COUNT = 100_000;
- private static final int ROW_COUNT_FOR_SEGMENT_FLUSH = 10_000;
- private static final long TIMEOUT_MILLIS = 20 * 60 * 1000L; // Twenty minutes
- private final File _tmpDir = new File("/tmp/" + getHelixClusterName());
- private static final int SEGMENT_COUNT = 1;
- private static final Random RANDOM = new Random(123456L);
- private static long rowsWritten = 0L;
-
- public static void main(String[] args) {
- try {
- new RealtimeStressTest().runBenchmark();
- } catch (Exception e) {
- System.exit(-1);
- }
- }
-
- private void runBenchmark()
- throws Exception {
- // Start ZK and Kafka
- startZk();
- StreamDataServerStartable kafkaStarter = KafkaStarterUtils
- .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
- KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-
- // Create Kafka topic
- kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(10));
-
- // Unpack data (needed to get the Avro schema)
- TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
- RealtimeClusterIntegrationTest.class.getClassLoader()
- .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz"))), _tmpDir);
-
- _tmpDir.mkdirs();
- final List<File> avroFiles = new ArrayList<File>(SEGMENT_COUNT);
- for (int segmentNumber = 1; segmentNumber <= SEGMENT_COUNT; ++segmentNumber) {
- avroFiles.add(new File(_tmpDir.getPath() + "/On_Time_On_Time_Performance_2014_" + segmentNumber + ".avro"));
- }
-
- File schemaFile = new File(OfflineClusterIntegrationTest.class.getClassLoader()
- .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema").getFile());
-
- // Start the Pinot cluster
- startController();
- startBroker();
- startServer();
-
- // Create realtime table
- setUpRealtimeTable(avroFiles.get(0));
-
- // Wait a couple of seconds for all Helix state transitions to happen
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
-
- // Generate ROW_COUNT rows and write them into Kafka
- ClusterIntegrationTestUtils
- .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
- rowsWritten += ROW_COUNT;
-
- // Run forever until something breaks or the timeout completes
- long pinotRecordCount = -1L;
- long timeAfterTimeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
- do {
- Thread.sleep(500L);
-
- // Run the query
- try {
- JsonNode response = postQuery("select count(*) from mytable");
- pinotRecordCount = response.get("aggregationResults").get(0).get("value").asLong();
- } catch (Exception e) {
- // Ignore
- continue;
- }
-
- // Write more rows if needed
- if (rowsWritten - pinotRecordCount < MIN_ROW_COUNT) {
- ClusterIntegrationTestUtils
- .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
- ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
- rowsWritten += ROW_COUNT;
- }
-
- System.out.println("Pinot record count: " + pinotRecordCount);
- if (timeAfterTimeout < System.currentTimeMillis()) {
- throw new RuntimeException("Timeout exceeded!");
- }
- } while (true);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org