You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/09/02 09:15:07 UTC
[pinot] branch master updated: Allow ingestion of errored records with incorrect datatype (#9320)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6047b06ac6 Allow ingestion of errored records with incorrect datatype (#9320)
6047b06ac6 is described below
commit 6047b06ac6f62ab7349e854a94f50199512ad973
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Sep 2 14:44:59 2022 +0530
Allow ingestion of errored records with incorrect datatype (#9320)
* Allow ingestion of errored records with incorrect datatype
* Add new config to SegmentGenerator as well
* Handle outOfRange timestamps and replace them with nulls
* Rename config and add a seperate config for timeValue check
* Add exceptions for incorrect time columns
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../recordtransformer/CompositeTransformer.java | 2 +-
.../recordtransformer/DataTypeTransformer.java | 52 +++++++++++++-
.../ExpressionTransformerTest.java | 2 +-
.../recordtransformer/RecordTransformerTest.java | 84 +++++++++++++++++++++-
.../spi/creator/SegmentGeneratorConfig.java | 20 ++++++
.../pinot/spi/config/table/IndexingConfig.java | 18 +++++
.../spi/utils/builder/TableConfigBuilder.java | 14 ++++
7 files changed, 187 insertions(+), 5 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index e21340a0fb..563359aa09 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -61,7 +61,7 @@ public class CompositeTransformer implements RecordTransformer {
public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) {
return new CompositeTransformer(Arrays
.asList(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
- new DataTypeTransformer(schema), new NullValueTransformer(tableConfig, schema),
+ new DataTypeTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
new SanitizationTransformer(schema)));
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index de7a6795d7..14f68f1394 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -27,10 +27,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -40,14 +47,33 @@ import org.apache.pinot.spi.data.readers.GenericRow;
*/
@SuppressWarnings("rawtypes")
public class DataTypeTransformer implements RecordTransformer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataTypeTransformer.class);
+
private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
+ private final boolean _continueOnError;
+ private final boolean _validateTimeValues;
+ private final String _timeColumnName;
+ private final DateTimeFormatSpec _timeFormatSpec;
- public DataTypeTransformer(Schema schema) {
+ public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn()) {
_dataTypes.put(fieldSpec.getName(), PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
}
}
+
+ _continueOnError = tableConfig.getIndexingConfig().isContinueOnError();
+ _validateTimeValues = tableConfig.getIndexingConfig().isValidateTimeValue();
+ _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
+
+ DateTimeFormatSpec timeColumnSpec = null;
+ if (StringUtils.isNotEmpty(_timeColumnName)) {
+ DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName);
+ Preconditions.checkState(dateTimeFieldSpec != null, "Failed to find spec for time column: %s from schema: %s",
+ _timeColumnName, schema.getSchemaName());
+ timeColumnSpec = dateTimeFieldSpec.getFormatSpec();
+ }
+ _timeFormatSpec = timeColumnSpec;
}
@Override
@@ -59,6 +85,23 @@ public class DataTypeTransformer implements RecordTransformer {
if (value == null) {
continue;
}
+
+ if (_validateTimeValues && _timeFormatSpec != null && column.equals(_timeColumnName)) {
+ long timeInMs = _timeFormatSpec.fromFormatToMillis(value.toString());
+ if (!TimeUtils.timeValueInValidRange(timeInMs)) {
+ if (_continueOnError) {
+ LOGGER.debug("Time value {} is not in valid range for column: {}, must be between: {}", timeInMs,
+ _timeColumnName, TimeUtils.VALID_TIME_INTERVAL);
+ record.putValue(column, null);
+ continue;
+ } else {
+ throw new RuntimeException(
+ String.format("Time value %s is not in valid range for column: %s, must be between: %s", timeInMs,
+ _timeColumnName, TimeUtils.VALID_TIME_INTERVAL));
+ }
+ }
+ }
+
PinotDataType dest = entry.getValue();
if (dest != PinotDataType.JSON) {
value = standardize(column, value, dest.isSingleValue());
@@ -95,7 +138,12 @@ public class DataTypeTransformer implements RecordTransformer {
record.putValue(column, value);
} catch (Exception e) {
- throw new RuntimeException("Caught exception while transforming data type for column: " + column, e);
+ if (!_continueOnError) {
+ throw new RuntimeException("Caught exception while transforming data type for column: " + column, e);
+ } else {
+ LOGGER.debug("Caught exception while transforming data type for column: {}", column, e);
+ record.putValue(column, null);
+ }
}
}
return record;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 1c37779339..756ea7ae7b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -67,7 +67,7 @@ public class ExpressionTransformerTest {
.setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
- DataTypeTransformer dataTypeTransformer = new DataTypeTransformer(pinotSchema);
+ DataTypeTransformer dataTypeTransformer = new DataTypeTransformer(tableConfig, pinotSchema);
// test functions from schema
GenericRow genericRow = new GenericRow();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index 6cc0d2bfb7..3100165228 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -132,7 +132,7 @@ public class RecordTransformerTest {
@Test
public void testDataTypeTransformer() {
- RecordTransformer transformer = new DataTypeTransformer(SCHEMA);
+ RecordTransformer transformer = new DataTypeTransformer(TABLE_CONFIG, SCHEMA);
GenericRow record = getRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
@@ -161,6 +161,88 @@ public class RecordTransformerTest {
}
}
+ @Test
+ public void testDataTypeTransformerIncorrectDataTypes() {
+ Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.BYTES)
+ .addSingleValueDimension("svLong", DataType.LONG).build();
+
+ RecordTransformer transformer = new DataTypeTransformer(TABLE_CONFIG, schema);
+ GenericRow record = getRecord();
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ assertThrows(() -> transformer.transform(record));
+ }
+
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setContinueOnError(true).setTableName("testTable").build();
+
+ RecordTransformer transformerWithDefaultNulls = new DataTypeTransformer(tableConfig, schema);
+ GenericRow record1 = getRecord();
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ record1 = transformerWithDefaultNulls.transform(record1);
+ assertNotNull(record1);
+ assertNull(record1.getValue("svInt"));
+ }
+ }
+
+ @Test
+ public void testDataTypeTransformerInvalidTimestamp() {
+ // Invalid Timestamp and Validation disabled
+ String timeCol = "timeCol";
+ Schema schema = new Schema.SchemaBuilder().addDateTime(timeCol, DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP",
+ "1:MILLISECONDS").build();
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol).setTableName("testTable").build();
+
+ RecordTransformer transformer = new DataTypeTransformer(tableConfig, schema);
+ GenericRow record = getRecord();
+ record.putValue(timeCol, 1L);
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue(timeCol), 1L);
+ }
+
+ // Invalid Timestamp and Validation enabled
+ tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
+ .setValidateTimeValue(true)
+ .setTableName("testTable").build();
+
+ RecordTransformer transformerWithValidation = new DataTypeTransformer(tableConfig, schema);
+ GenericRow record1 = getRecord();
+ record1.putValue(timeCol, 1L);
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ assertThrows(() -> transformerWithValidation.transform(record1));
+ }
+
+ // Invalid timestamp, validation enabled and ignoreErrors enabled
+ tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
+ .setValidateTimeValue(true)
+ .setContinueOnError(true)
+ .setTableName("testTable").build();
+
+ transformer = new DataTypeTransformer(tableConfig, schema);
+ GenericRow record2 = getRecord();
+ record2.putValue(timeCol, 1L);
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ record2 = transformer.transform(record2);
+ assertNotNull(record2);
+ assertNull(record2.getValue(timeCol));
+ }
+
+ // Valid timestamp
+ transformer = new DataTypeTransformer(TABLE_CONFIG, schema);
+ GenericRow record3 = getRecord();
+ Long currentTimeMillis = System.currentTimeMillis();
+ record3.putValue(timeCol, currentTimeMillis);
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ record3 = transformer.transform(record3);
+ assertNotNull(record3);
+ assertEquals(record3.getValue(timeCol), currentTimeMillis);
+ }
+ }
+
@Test
public void testSanitationTransformer() {
RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index d81e69e18d..9dbfbd2fa2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -113,6 +113,8 @@ public class SegmentGeneratorConfig implements Serializable {
private boolean _onHeap = false;
private boolean _skipTimeValueCheck = false;
private boolean _nullHandlingEnabled = false;
+ private boolean _continueOnError = false;
+ private boolean _validateTimeValue = false;
private boolean _failOnEmptySegment = false;
private boolean _optimizeDictionaryForMetrics = false;
private double _noDictionarySizeRatioThreshold = DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
@@ -223,6 +225,8 @@ public class SegmentGeneratorConfig implements Serializable {
_fstTypeForFSTIndex = tableConfig.getIndexingConfig().getFSTIndexType();
_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
+ _continueOnError = indexingConfig.isContinueOnError();
+ _validateTimeValue = indexingConfig.isValidateTimeValue();
_optimizeDictionaryForMetrics = indexingConfig.isOptimizeDictionaryForMetrics();
_noDictionarySizeRatioThreshold = indexingConfig.getNoDictionarySizeRatioThreshold();
}
@@ -784,6 +788,22 @@ public class SegmentGeneratorConfig implements Serializable {
_nullHandlingEnabled = nullHandlingEnabled;
}
+ public boolean isContinueOnError() {
+ return _continueOnError;
+ }
+
+ public void setContinueOnError(boolean continueOnError) {
+ _continueOnError = continueOnError;
+ }
+
+ public boolean isValidateTimeValue() {
+ return _validateTimeValue;
+ }
+
+ public void setValidateTimeValue(boolean validateTimeValue) {
+ _validateTimeValue = validateTimeValue;
+ }
+
public boolean isOptimizeDictionaryForMetrics() {
return _optimizeDictionaryForMetrics;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 0211c4c48f..b73649cbeb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -53,6 +53,8 @@ public class IndexingConfig extends BaseJsonConfig {
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _aggregateMetrics;
private boolean _nullHandlingEnabled;
+ private boolean _continueOnError;
+ private boolean _validateTimeValue;
/**
* If `optimizeDictionaryForMetrics` enabled, dictionary is not created for the metric columns
@@ -283,6 +285,22 @@ public class IndexingConfig extends BaseJsonConfig {
_nullHandlingEnabled = nullHandlingEnabled;
}
+ public boolean isContinueOnError() {
+ return _continueOnError;
+ }
+
+ public void setContinueOnError(boolean continueOnError) {
+ _continueOnError = continueOnError;
+ }
+
+ public boolean isValidateTimeValue() {
+ return _validateTimeValue;
+ }
+
+ public void setValidateTimeValue(boolean validateTimeValue) {
+ _validateTimeValue = validateTimeValue;
+ }
+
public boolean isOptimizeDictionaryForMetrics() {
return _optimizeDictionaryForMetrics;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index e884d8155f..f2f59a49ef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -97,6 +97,8 @@ public class TableConfigBuilder {
private Map<String, String> _streamConfigs;
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _nullHandlingEnabled;
+ private boolean _continueOnError;
+ private boolean _validateTimeValue;
private List<String> _varLengthDictionaryColumns;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private List<String> _jsonIndexColumns;
@@ -309,6 +311,16 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setContinueOnError(boolean continueOnError) {
+ _continueOnError = continueOnError;
+ return this;
+ }
+
+ public TableConfigBuilder setValidateTimeValue(boolean validateTimeValue) {
+ _validateTimeValue = validateTimeValue;
+ return this;
+ }
+
public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
_customConfig = customConfig;
return this;
@@ -420,6 +432,8 @@ public class TableConfigBuilder {
indexingConfig.setStreamConfigs(_streamConfigs);
indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
+ indexingConfig.setContinueOnError(_continueOnError);
+ indexingConfig.setValidateTimeValue(_validateTimeValue);
indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
indexingConfig.setJsonIndexColumns(_jsonIndexColumns);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org