You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/14 04:56:41 UTC
[incubator-pinot] branch master updated: Fix NPE for aggregate
metrics (#5862)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 449bf94 Fix NPE for aggregate metrics (#5862)
449bf94 is described below
commit 449bf94d0dd9ba7e07f32d44bcc534e95248c321
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Aug 13 21:56:25 2020 -0700
Fix NPE for aggregate metrics (#5862)
For real-time table with aggregate metrics enabled, the metrics will have `null` min/max value in the column stats while creating the segment, which causes `NPE`.
Added the null check to prevent the exception.
Also added an integration test for the aggregate metrics
---
.../creator/impl/SegmentColumnarIndexCreator.java | 25 ++--
.../AggregateMetricsClusterIntegrationTest.java | 130 +++++++++++++++++++++
2 files changed, 138 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 34131ab..233391b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -532,30 +532,21 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
}
- String minValue = columnIndexCreationInfo.getMin().toString();
- String maxValue = columnIndexCreationInfo.getMax().toString();
+ // NOTE: Min/max could be null for real-time aggregate metrics.
+ Object min = columnIndexCreationInfo.getMin();
+ Object max = columnIndexCreationInfo.getMax();
+ if (min != null && max != null) {
+ addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+ }
+
String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString();
- if (dataType == DataType.STRING) {
- // Check special characters for STRING column
- if (isValidPropertyValue(minValue)) {
- properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
- }
- if (isValidPropertyValue(maxValue)) {
- properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue);
- }
- if (isValidPropertyValue(defaultNullValue)) {
- properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue);
- }
- } else {
- properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
- properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue);
+ if (isValidPropertyValue(defaultNullValue)) {
properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue);
}
}
public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue,
String maxValue) {
- // Check special characters for STRING column
if (isValidPropertyValue(minValue)) {
properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
new file mode 100644
index 0000000..4f6d80f
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that enables aggregate metrics for the LLC real-time table.
+ */
+public class AggregateMetricsClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ // Start Kafka
+ startKafka();
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload the schema and table config with reduced number of columns and aggregate metrics on
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING)
+ .addSingleValueDimension("Origin", DataType.STRING).addMetric("AirTime", DataType.LONG)
+ .addMetric("ArrDelay", DataType.DOUBLE)
+ .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
+ addSchema(schema);
+ TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+ indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
+ indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime", "ArrDelay"));
+ indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
+ indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
+ indexingConfig.setAggregateMetrics(true);
+ addTableConfig(tableConfig);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(avroFiles);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Override
+ protected boolean useLlc() {
+ // NOTE: Aggregate metrics is only available with LLC.
+ return true;
+ }
+
+ @Override
+ protected void waitForAllDocsLoaded(long timeoutMs) {
+ // NOTE: For aggregate metrics, we need to test the aggregation result instead of the document count because
+ // documents can be merged during ingestion.
+ String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResult = postSqlQuery(sql, _brokerBaseApiUrl);
+ JsonNode aggregationResults = queryResult.get("resultTable").get("rows").get(0);
+ return aggregationResults.get(0).asInt() == -165429728 && aggregationResults.get(1).asInt() == -175625957;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ }
+
+ @Test
+ public void testQueries()
+ throws Exception {
+ String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ sql = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ sql = "SELECT Origin, SUM(ArrDelay) FROM mytable WHERE Carrier = 'AA' GROUP BY Origin ORDER BY Origin";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org