You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/10/18 22:21:00 UTC
[pinot] branch master updated: fix DataSchema thread-safe issue (#9619)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3a5280951 fix DataSchema thread-safe issue (#9619)
c3a5280951 is described below
commit c3a5280951145e7f1831d2614a45c5c1e5314662
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Oct 18 15:20:55 2022 -0700
fix DataSchema thread-safe issue (#9619)
* fix DataSchema thread-safe issue by using cached storedColumnDataType
* fix upgradeToCover, it could change the underlying storedColumnDataType
* make use of enum reference rather than switch statements
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../org/apache/pinot/common/utils/DataSchema.java | 66 ++++++++++++----------
.../apache/pinot/common/utils/DataSchemaTest.java | 2 +-
.../query/reduce/SelectionDataTableReducer.java | 2 +-
.../selection/SelectionOperatorServiceTest.java | 2 +-
4 files changed, 39 insertions(+), 33 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index dbb56afa0d..57565751f0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -83,16 +83,21 @@ public class DataSchema {
return _columnDataTypes;
}
+ /**
+ * Lazy compute the _storeColumnDataTypes field.
+ */
@JsonIgnore
public ColumnDataType[] getStoredColumnDataTypes() {
- if (_storedColumnDataTypes == null) {
+ ColumnDataType[] storedColumnDataTypes = _storedColumnDataTypes;
+ if (storedColumnDataTypes == null) {
int numColumns = _columnDataTypes.length;
- _storedColumnDataTypes = new ColumnDataType[numColumns];
+ storedColumnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < numColumns; i++) {
- _storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType();
+ storedColumnDataTypes[i] = _columnDataTypes[i].getStoredType();
}
+ _storedColumnDataTypes = storedColumnDataTypes;
}
- return _storedColumnDataTypes;
+ return storedColumnDataTypes;
}
/**
@@ -126,29 +131,34 @@ public class DataSchema {
* <code>LONG</code>.
* <p>NOTE: The given data schema should be type compatible with this one.
*
+ * @param originalSchema the original Data schema
* @param anotherDataSchema Data schema to cover
*/
- public void upgradeToCover(DataSchema anotherDataSchema) {
- int numColumns = _columnDataTypes.length;
+ public static DataSchema upgradeToCover(DataSchema originalSchema, DataSchema anotherDataSchema) {
+ int numColumns = originalSchema._columnDataTypes.length;
+ ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < numColumns; i++) {
- ColumnDataType thisColumnDataType = _columnDataTypes[i];
+ ColumnDataType thisColumnDataType = originalSchema._columnDataTypes[i];
ColumnDataType thatColumnDataType = anotherDataSchema._columnDataTypes[i];
if (thisColumnDataType != thatColumnDataType) {
if (thisColumnDataType.isArray()) {
if (thisColumnDataType.isWholeNumberArray() && thatColumnDataType.isWholeNumberArray()) {
- _columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
+ columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
} else {
- _columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
+ columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
}
} else {
if (thisColumnDataType.isWholeNumber() && thatColumnDataType.isWholeNumber()) {
- _columnDataTypes[i] = ColumnDataType.LONG;
+ columnDataTypes[i] = ColumnDataType.LONG;
} else {
- _columnDataTypes[i] = ColumnDataType.DOUBLE;
+ columnDataTypes[i] = ColumnDataType.DOUBLE;
}
}
+ } else {
+ columnDataTypes[i] = originalSchema._columnDataTypes[i];
}
}
+ return new DataSchema(originalSchema._columnNames, columnDataTypes);
}
public byte[] toBytes()
@@ -246,18 +256,18 @@ public class DataSchema {
FLOAT(0f),
DOUBLE(0d),
BIG_DECIMAL(BigDecimal.ZERO),
- BOOLEAN(0) /* Stored as INT */,
- TIMESTAMP(0L) /* Stored as LONG */,
+ BOOLEAN(INT, 0),
+ TIMESTAMP(LONG, 0L),
STRING(""),
- JSON("") /* Stored as STRING */,
+ JSON(STRING, ""),
BYTES(new ByteArray(new byte[0])),
OBJECT(null),
INT_ARRAY(new int[0]),
LONG_ARRAY(new long[0]),
FLOAT_ARRAY(new float[0]),
DOUBLE_ARRAY(new double[0]),
- BOOLEAN_ARRAY(new int[0]) /* Stored as INT_ARRAY */,
- TIMESTAMP_ARRAY(new long[0]) /* Stored as LONG_ARRAY */,
+ BOOLEAN_ARRAY(INT_ARRAY, new int[0]),
+ TIMESTAMP_ARRAY(LONG_ARRAY, new long[0]),
STRING_ARRAY(new String[0]),
BYTES_ARRAY(new byte[0][]);
@@ -270,10 +280,19 @@ public class DataSchema {
EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY);
private static final EnumSet<ColumnDataType> INTEGRAL_ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY);
+ // stored data type.
+ private final ColumnDataType _storedColumnDataType;
+
// Placeholder for null. We need a placeholder for null so that it can be serialized in the data table
private final Object _nullPlaceholder;
ColumnDataType(Object nullPlaceHolder) {
+ _storedColumnDataType = this;
+ _nullPlaceholder = nullPlaceHolder;
+ }
+
+ ColumnDataType(ColumnDataType storedColumnDataType, Object nullPlaceHolder) {
+ _storedColumnDataType = storedColumnDataType;
_nullPlaceholder = nullPlaceHolder;
}
@@ -285,20 +304,7 @@ public class DataSchema {
* Returns the data type stored in Pinot.
*/
public ColumnDataType getStoredType() {
- switch (this) {
- case BOOLEAN:
- return INT;
- case TIMESTAMP:
- return LONG;
- case JSON:
- return STRING;
- case BOOLEAN_ARRAY:
- return INT_ARRAY;
- case TIMESTAMP_ARRAY:
- return LONG_ARRAY;
- default:
- return this;
- }
+ return _storedColumnDataType;
}
public boolean isNumber() {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index ec18998161..5d6f88e3c7 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -82,7 +82,7 @@ public class DataSchemaTest {
DataSchema incompatibleDataSchema = new DataSchema(anotherColumnNames, COLUMN_DATA_TYPES);
Assert.assertFalse(dataSchema.isTypeCompatibleWith(incompatibleDataSchema));
- dataSchema.upgradeToCover(compatibleDataSchema);
+ dataSchema = DataSchema.upgradeToCover(dataSchema, compatibleDataSchema);
DataSchema upgradedDataSchema = new DataSchema(COLUMN_NAMES, UPGRADED_COLUMN_DATA_TYPES);
Assert.assertEquals(dataSchema, upgradedDataSchema);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index 495995a9fb..4d30f54935 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -118,7 +118,7 @@ public class SelectionDataTableReducer implements DataTableReducer {
droppedServers.add(entry.getKey());
iterator.remove();
} else {
- dataSchema.upgradeToCover(dataSchemaToCompare);
+ dataSchema = DataSchema.upgradeToCover(dataSchema, dataSchemaToCompare);
}
}
return droppedServers;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index 5b74a2b91d..df95b6a5e1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -208,7 +208,7 @@ public class SelectionOperatorServiceTest {
rows.add(_compatibleRow1);
DataSchema dataSchema = _dataSchema.clone();
assertTrue(dataSchema.isTypeCompatibleWith(_compatibleDataSchema));
- dataSchema.upgradeToCover(_compatibleDataSchema);
+ dataSchema = DataSchema.upgradeToCover(dataSchema, _compatibleDataSchema);
assertEquals(dataSchema, _upgradedDataSchema);
DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false);
Object[] expectedRow1 = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org