You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/14 04:56:41 UTC

[incubator-pinot] branch master updated: Fix NPE for aggregate metrics (#5862)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 449bf94  Fix NPE for aggregate metrics (#5862)
449bf94 is described below

commit 449bf94d0dd9ba7e07f32d44bcc534e95248c321
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Aug 13 21:56:25 2020 -0700

    Fix NPE for aggregate metrics (#5862)
    
    For real-time table with aggregate metrics enabled, the metrics will have `null` min/max value in the column stats while creating the segment, which causes `NPE`.
    Added the null check to prevent the exception.
    Also added an integration test for the aggregate metrics
---
 .../creator/impl/SegmentColumnarIndexCreator.java  |  25 ++--
 .../AggregateMetricsClusterIntegrationTest.java    | 130 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 34131ab..233391b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -532,30 +532,21 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
     }
 
-    String minValue = columnIndexCreationInfo.getMin().toString();
-    String maxValue = columnIndexCreationInfo.getMax().toString();
+    // NOTE: Min/max could be null for real-time aggregate metrics.
+    Object min = columnIndexCreationInfo.getMin();
+    Object max = columnIndexCreationInfo.getMax();
+    if (min != null && max != null) {
+      addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+    }
+
     String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString();
-    if (dataType == DataType.STRING) {
-      // Check special characters for STRING column
-      if (isValidPropertyValue(minValue)) {
-        properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
-      }
-      if (isValidPropertyValue(maxValue)) {
-        properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue);
-      }
-      if (isValidPropertyValue(defaultNullValue)) {
-        properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue);
-      }
-    } else {
-      properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
-      properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue);
+    if (isValidPropertyValue(defaultNullValue)) {
       properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue);
     }
   }
 
   public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue,
       String maxValue) {
-    // Check special characters for STRING column
     if (isValidPropertyValue(minValue)) {
       properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
     }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
new file mode 100644
index 0000000..4f6d80f
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that enables aggregate metrics for the LLC real-time table.
+ */
+public class AggregateMetricsClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config with reduced number of columns and aggregate metrics on
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING)
+            .addSingleValueDimension("Origin", DataType.STRING).addMetric("AirTime", DataType.LONG)
+            .addMetric("ArrDelay", DataType.DOUBLE)
+            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
+    addSchema(schema);
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+    indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
+    indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime", "ArrDelay"));
+    indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
+    indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
+    indexingConfig.setAggregateMetrics(true);
+    addTableConfig(tableConfig);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected boolean useLlc() {
+    // NOTE: Aggregate metrics is only available with LLC.
+    return true;
+  }
+
+  @Override
+  protected void waitForAllDocsLoaded(long timeoutMs) {
+    // NOTE: For aggregate metrics, we need to test the aggregation result instead of the document count because
+    //       documents can be merged during ingestion.
+    String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResult = postSqlQuery(sql, _brokerBaseApiUrl);
+        JsonNode aggregationResults = queryResult.get("resultTable").get("rows").get(0);
+        return aggregationResults.get(0).asInt() == -165429728 && aggregationResults.get(1).asInt() == -175625957;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+  }
+
+  @Test
+  public void testQueries()
+      throws Exception {
+    String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
+    testSqlQuery(sql, Collections.singletonList(sql));
+    sql = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
+    testSqlQuery(sql, Collections.singletonList(sql));
+    sql = "SELECT Origin, SUM(ArrDelay) FROM mytable WHERE Carrier = 'AA' GROUP BY Origin ORDER BY Origin";
+    testSqlQuery(sql, Collections.singletonList(sql));
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org