You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/10/18 22:21:00 UTC

[pinot] branch master updated: fix DataSchema thread-safe issue (#9619)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a5280951 fix DataSchema thread-safe issue (#9619)
c3a5280951 is described below

commit c3a5280951145e7f1831d2614a45c5c1e5314662
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Oct 18 15:20:55 2022 -0700

    fix DataSchema thread-safe issue (#9619)
    
    * fix DataSchema thread-safe issue by using cached storedColumnDataType
    * fix upgradeToCover, it could change the underlying storedColumnDataType
    * make use of enum reference rather than switch statements
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../org/apache/pinot/common/utils/DataSchema.java  | 66 ++++++++++++----------
 .../apache/pinot/common/utils/DataSchemaTest.java  |  2 +-
 .../query/reduce/SelectionDataTableReducer.java    |  2 +-
 .../selection/SelectionOperatorServiceTest.java    |  2 +-
 4 files changed, 39 insertions(+), 33 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index dbb56afa0d..57565751f0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -83,16 +83,21 @@ public class DataSchema {
     return _columnDataTypes;
   }
 
+  /**
+   * Lazy compute the _storeColumnDataTypes field.
+   */
   @JsonIgnore
   public ColumnDataType[] getStoredColumnDataTypes() {
-    if (_storedColumnDataTypes == null) {
+    ColumnDataType[] storedColumnDataTypes = _storedColumnDataTypes;
+    if (storedColumnDataTypes == null) {
       int numColumns = _columnDataTypes.length;
-      _storedColumnDataTypes = new ColumnDataType[numColumns];
+      storedColumnDataTypes = new ColumnDataType[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        _storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType();
+        storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType();
       }
+      _storedColumnDataTypes = storedColumnDataTypes;
     }
-    return _storedColumnDataTypes;
+    return storedColumnDataTypes;
   }
 
   /**
@@ -126,29 +131,34 @@ public class DataSchema {
    * <code>LONG</code>.
    * <p>NOTE: The given data schema should be type compatible with this one.
    *
+   * @param originalSchema the original Data schema
    * @param anotherDataSchema Data schema to cover
    */
-  public void upgradeToCover(DataSchema anotherDataSchema) {
-    int numColumns = _columnDataTypes.length;
+  public static DataSchema upgradeToCover(DataSchema originalSchema, DataSchema anotherDataSchema) {
+    int numColumns = originalSchema._columnDataTypes.length;
+    ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
     for (int i = 0; i < numColumns; i++) {
-      ColumnDataType thisColumnDataType = _columnDataTypes[i];
+      ColumnDataType thisColumnDataType = originalSchema._columnDataTypes[i];
       ColumnDataType thatColumnDataType = anotherDataSchema._columnDataTypes[i];
       if (thisColumnDataType != thatColumnDataType) {
         if (thisColumnDataType.isArray()) {
           if (thisColumnDataType.isWholeNumberArray() && thatColumnDataType.isWholeNumberArray()) {
-            _columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
+            columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
           } else {
-            _columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
+            columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
           }
         } else {
           if (thisColumnDataType.isWholeNumber() && thatColumnDataType.isWholeNumber()) {
-            _columnDataTypes[i] = ColumnDataType.LONG;
+            columnDataTypes[i] = ColumnDataType.LONG;
           } else {
-            _columnDataTypes[i] = ColumnDataType.DOUBLE;
+            columnDataTypes[i] = ColumnDataType.DOUBLE;
           }
         }
+      } else {
+        columnDataTypes[i] = originalSchema._columnDataTypes[i];
       }
     }
+    return new DataSchema(originalSchema._columnNames, columnDataTypes);
   }
 
   public byte[] toBytes()
@@ -246,18 +256,18 @@ public class DataSchema {
     FLOAT(0f),
     DOUBLE(0d),
     BIG_DECIMAL(BigDecimal.ZERO),
-    BOOLEAN(0) /* Stored as INT */,
-    TIMESTAMP(0L) /* Stored as LONG */,
+    BOOLEAN(INT, 0),
+    TIMESTAMP(LONG, 0L),
     STRING(""),
-    JSON("") /* Stored as STRING */,
+    JSON(STRING, ""),
     BYTES(new ByteArray(new byte[0])),
     OBJECT(null),
     INT_ARRAY(new int[0]),
     LONG_ARRAY(new long[0]),
     FLOAT_ARRAY(new float[0]),
     DOUBLE_ARRAY(new double[0]),
-    BOOLEAN_ARRAY(new int[0]) /* Stored as INT_ARRAY */,
-    TIMESTAMP_ARRAY(new long[0]) /* Stored as LONG_ARRAY */,
+    BOOLEAN_ARRAY(INT_ARRAY, new int[0]),
+    TIMESTAMP_ARRAY(LONG_ARRAY, new long[0]),
     STRING_ARRAY(new String[0]),
     BYTES_ARRAY(new byte[0][]);
 
@@ -270,10 +280,19 @@ public class DataSchema {
         EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY);
     private static final EnumSet<ColumnDataType> INTEGRAL_ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY);
 
+    // stored data type.
+    private final ColumnDataType _storedColumnDataType;
+
     // Placeholder for null. We need a placeholder for null so that it can be serialized in the data table
     private final Object _nullPlaceholder;
 
     ColumnDataType(Object nullPlaceHolder) {
+      _storedColumnDataType = this;
+      _nullPlaceholder = nullPlaceHolder;
+    }
+
+    ColumnDataType(ColumnDataType storedColumnDataType, Object nullPlaceHolder) {
+      _storedColumnDataType = storedColumnDataType;
       _nullPlaceholder = nullPlaceHolder;
     }
 
@@ -285,20 +304,7 @@ public class DataSchema {
      * Returns the data type stored in Pinot.
      */
     public ColumnDataType getStoredType() {
-      switch (this) {
-        case BOOLEAN:
-          return INT;
-        case TIMESTAMP:
-          return LONG;
-        case JSON:
-          return STRING;
-        case BOOLEAN_ARRAY:
-          return INT_ARRAY;
-        case TIMESTAMP_ARRAY:
-          return LONG_ARRAY;
-        default:
-          return this;
-      }
+      return _storedColumnDataType;
     }
 
     public boolean isNumber() {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index ec18998161..5d6f88e3c7 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -82,7 +82,7 @@ public class DataSchemaTest {
     DataSchema incompatibleDataSchema = new DataSchema(anotherColumnNames, COLUMN_DATA_TYPES);
     Assert.assertFalse(dataSchema.isTypeCompatibleWith(incompatibleDataSchema));
 
-    dataSchema.upgradeToCover(compatibleDataSchema);
+    dataSchema = DataSchema.upgradeToCover(dataSchema, compatibleDataSchema);
     DataSchema upgradedDataSchema = new DataSchema(COLUMN_NAMES, UPGRADED_COLUMN_DATA_TYPES);
     Assert.assertEquals(dataSchema, upgradedDataSchema);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index 495995a9fb..4d30f54935 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -118,7 +118,7 @@ public class SelectionDataTableReducer implements DataTableReducer {
         droppedServers.add(entry.getKey());
         iterator.remove();
       } else {
-        dataSchema.upgradeToCover(dataSchemaToCompare);
+        dataSchema = DataSchema.upgradeToCover(dataSchema, dataSchemaToCompare);
       }
     }
     return droppedServers;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index 5b74a2b91d..df95b6a5e1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -208,7 +208,7 @@ public class SelectionOperatorServiceTest {
     rows.add(_compatibleRow1);
     DataSchema dataSchema = _dataSchema.clone();
     assertTrue(dataSchema.isTypeCompatibleWith(_compatibleDataSchema));
-    dataSchema.upgradeToCover(_compatibleDataSchema);
+    dataSchema = DataSchema.upgradeToCover(dataSchema, _compatibleDataSchema);
     assertEquals(dataSchema, _upgradedDataSchema);
     DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false);
     Object[] expectedRow1 = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org