You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/09/02 09:15:07 UTC

[pinot] branch master updated: Allow ingestion of errored records with incorrect datatype (#9320)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6047b06ac6 Allow ingestion of errored records with incorrect datatype (#9320)
6047b06ac6 is described below

commit 6047b06ac6f62ab7349e854a94f50199512ad973
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Sep 2 14:44:59 2022 +0530

    Allow ingestion of errored records with incorrect datatype (#9320)
    
    * Allow ingestion of errored records with incorrect datatype
    
    * Add new config to SegmentGenerator as well
    
    * Handle outOfRange timestamps and replace them with nulls
    
    * Rename config and add a seperate config for timeValue check
    
    * Add exceptions for incorrect time columns
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../recordtransformer/CompositeTransformer.java    |  2 +-
 .../recordtransformer/DataTypeTransformer.java     | 52 +++++++++++++-
 .../ExpressionTransformerTest.java                 |  2 +-
 .../recordtransformer/RecordTransformerTest.java   | 84 +++++++++++++++++++++-
 .../spi/creator/SegmentGeneratorConfig.java        | 20 ++++++
 .../pinot/spi/config/table/IndexingConfig.java     | 18 +++++
 .../spi/utils/builder/TableConfigBuilder.java      | 14 ++++
 7 files changed, 187 insertions(+), 5 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index e21340a0fb..563359aa09 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -61,7 +61,7 @@ public class CompositeTransformer implements RecordTransformer {
   public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) {
     return new CompositeTransformer(Arrays
         .asList(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
-            new DataTypeTransformer(schema), new NullValueTransformer(tableConfig, schema),
+            new DataTypeTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
             new SanitizationTransformer(schema)));
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index de7a6795d7..14f68f1394 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -27,10 +27,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -40,14 +47,33 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  */
 @SuppressWarnings("rawtypes")
 public class DataTypeTransformer implements RecordTransformer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataTypeTransformer.class);
+
   private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
+  private final boolean _continueOnError;
+  private final boolean _validateTimeValues;
+  private final String _timeColumnName;
+  private final DateTimeFormatSpec _timeFormatSpec;
 
-  public DataTypeTransformer(Schema schema) {
+  public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn()) {
         _dataTypes.put(fieldSpec.getName(), PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
       }
     }
+
+    _continueOnError = tableConfig.getIndexingConfig().isContinueOnError();
+    _validateTimeValues = tableConfig.getIndexingConfig().isValidateTimeValue();
+    _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
+
+    DateTimeFormatSpec timeColumnSpec = null;
+    if (StringUtils.isNotEmpty(_timeColumnName)) {
+      DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName);
+      Preconditions.checkState(dateTimeFieldSpec != null, "Failed to find spec for time column: %s from schema: %s",
+          _timeColumnName, schema.getSchemaName());
+      timeColumnSpec = dateTimeFieldSpec.getFormatSpec();
+    }
+    _timeFormatSpec = timeColumnSpec;
   }
 
   @Override
@@ -59,6 +85,23 @@ public class DataTypeTransformer implements RecordTransformer {
         if (value == null) {
           continue;
         }
+
+        if (_validateTimeValues && _timeFormatSpec != null && column.equals(_timeColumnName)) {
+          long timeInMs = _timeFormatSpec.fromFormatToMillis(value.toString());
+          if (!TimeUtils.timeValueInValidRange(timeInMs)) {
+            if (_continueOnError) {
+              LOGGER.debug("Time value {} is not in valid range for column: {}, must be between: {}", timeInMs,
+                  _timeColumnName, TimeUtils.VALID_TIME_INTERVAL);
+              record.putValue(column, null);
+              continue;
+            } else {
+              throw new RuntimeException(
+                  String.format("Time value %s is not in valid range for column: %s, must be between: %s", timeInMs,
+                      _timeColumnName, TimeUtils.VALID_TIME_INTERVAL));
+            }
+          }
+        }
+
         PinotDataType dest = entry.getValue();
         if (dest != PinotDataType.JSON) {
           value = standardize(column, value, dest.isSingleValue());
@@ -95,7 +138,12 @@ public class DataTypeTransformer implements RecordTransformer {
 
         record.putValue(column, value);
       } catch (Exception e) {
-        throw new RuntimeException("Caught exception while transforming data type for column: " + column, e);
+        if (!_continueOnError) {
+          throw new RuntimeException("Caught exception while transforming data type for column: " + column, e);
+        } else {
+          LOGGER.debug("Caught exception while transforming data type for column: {}", column, e);
+          record.putValue(column, null);
+        }
       }
     }
     return record;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 1c37779339..756ea7ae7b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -67,7 +67,7 @@ public class ExpressionTransformerTest {
         .setIngestionConfig(ingestionConfig).build();
 
     ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema);
-    DataTypeTransformer dataTypeTransformer = new DataTypeTransformer(pinotSchema);
+    DataTypeTransformer dataTypeTransformer = new DataTypeTransformer(tableConfig, pinotSchema);
 
     // test functions from schema
     GenericRow genericRow = new GenericRow();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index 6cc0d2bfb7..3100165228 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -132,7 +132,7 @@ public class RecordTransformerTest {
 
   @Test
   public void testDataTypeTransformer() {
-    RecordTransformer transformer = new DataTypeTransformer(SCHEMA);
+    RecordTransformer transformer = new DataTypeTransformer(TABLE_CONFIG, SCHEMA);
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       record = transformer.transform(record);
@@ -161,6 +161,88 @@ public class RecordTransformerTest {
     }
   }
 
+  @Test
+  public void testDataTypeTransformerIncorrectDataTypes() {
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.BYTES)
+        .addSingleValueDimension("svLong", DataType.LONG).build();
+
+    RecordTransformer transformer = new DataTypeTransformer(TABLE_CONFIG, schema);
+    GenericRow record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      assertThrows(() -> transformer.transform(record));
+    }
+
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setContinueOnError(true).setTableName("testTable").build();
+
+    RecordTransformer transformerWithDefaultNulls = new DataTypeTransformer(tableConfig, schema);
+    GenericRow record1 = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record1 = transformerWithDefaultNulls.transform(record1);
+      assertNotNull(record1);
+      assertNull(record1.getValue("svInt"));
+    }
+  }
+
+  @Test
+  public void testDataTypeTransformerInvalidTimestamp() {
+    // Invalid Timestamp and Validation disabled
+    String timeCol = "timeCol";
+    Schema schema = new Schema.SchemaBuilder().addDateTime(timeCol, DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP",
+        "1:MILLISECONDS").build();
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol).setTableName("testTable").build();
+
+    RecordTransformer transformer = new DataTypeTransformer(tableConfig, schema);
+    GenericRow record = getRecord();
+    record.putValue(timeCol, 1L);
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue(timeCol), 1L);
+    }
+
+    // Invalid Timestamp and Validation enabled
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
+            .setValidateTimeValue(true)
+            .setTableName("testTable").build();
+
+    RecordTransformer transformerWithValidation = new DataTypeTransformer(tableConfig, schema);
+    GenericRow record1 = getRecord();
+    record1.putValue(timeCol, 1L);
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      assertThrows(() -> transformerWithValidation.transform(record1));
+    }
+
+    // Invalid timestamp, validation enabled and ignoreErrors enabled
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
+            .setValidateTimeValue(true)
+            .setContinueOnError(true)
+            .setTableName("testTable").build();
+
+    transformer = new DataTypeTransformer(tableConfig, schema);
+    GenericRow record2 = getRecord();
+    record2.putValue(timeCol, 1L);
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record2 = transformer.transform(record2);
+      assertNotNull(record2);
+      assertNull(record2.getValue(timeCol));
+    }
+
+    // Valid timestamp
+    transformer = new DataTypeTransformer(TABLE_CONFIG, schema);
+    GenericRow record3 = getRecord();
+    Long currentTimeMillis = System.currentTimeMillis();
+    record3.putValue(timeCol, currentTimeMillis);
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record3 = transformer.transform(record3);
+      assertNotNull(record3);
+      assertEquals(record3.getValue(timeCol), currentTimeMillis);
+    }
+  }
+
   @Test
   public void testSanitationTransformer() {
     RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index d81e69e18d..9dbfbd2fa2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -113,6 +113,8 @@ public class SegmentGeneratorConfig implements Serializable {
   private boolean _onHeap = false;
   private boolean _skipTimeValueCheck = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _continueOnError = false;
+  private boolean _validateTimeValue = false;
   private boolean _failOnEmptySegment = false;
   private boolean _optimizeDictionaryForMetrics = false;
   private double _noDictionarySizeRatioThreshold = DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
@@ -223,6 +225,8 @@ public class SegmentGeneratorConfig implements Serializable {
       _fstTypeForFSTIndex = tableConfig.getIndexingConfig().getFSTIndexType();
 
       _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
+      _continueOnError = indexingConfig.isContinueOnError();
+      _validateTimeValue = indexingConfig.isValidateTimeValue();
       _optimizeDictionaryForMetrics = indexingConfig.isOptimizeDictionaryForMetrics();
       _noDictionarySizeRatioThreshold = indexingConfig.getNoDictionarySizeRatioThreshold();
     }
@@ -784,6 +788,22 @@ public class SegmentGeneratorConfig implements Serializable {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
 
+  public boolean isContinueOnError() {
+    return _continueOnError;
+  }
+
+  public void setContinueOnError(boolean continueOnError) {
+    _continueOnError = continueOnError;
+  }
+
+  public boolean isValidateTimeValue() {
+    return _validateTimeValue;
+  }
+
+  public void setValidateTimeValue(boolean validateTimeValue) {
+    _validateTimeValue = validateTimeValue;
+  }
+
   public boolean isOptimizeDictionaryForMetrics() {
     return _optimizeDictionaryForMetrics;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 0211c4c48f..b73649cbeb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -53,6 +53,8 @@ public class IndexingConfig extends BaseJsonConfig {
   private SegmentPartitionConfig _segmentPartitionConfig;
   private boolean _aggregateMetrics;
   private boolean _nullHandlingEnabled;
+  private boolean _continueOnError;
+  private boolean _validateTimeValue;
 
   /**
    * If `optimizeDictionaryForMetrics` enabled, dictionary is not created for the metric columns
@@ -283,6 +285,22 @@ public class IndexingConfig extends BaseJsonConfig {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
 
+  public boolean isContinueOnError() {
+    return _continueOnError;
+  }
+
+  public void setContinueOnError(boolean continueOnError) {
+    _continueOnError = continueOnError;
+  }
+
+  public boolean isValidateTimeValue() {
+    return _validateTimeValue;
+  }
+
+  public void setValidateTimeValue(boolean validateTimeValue) {
+    _validateTimeValue = validateTimeValue;
+  }
+
   public boolean isOptimizeDictionaryForMetrics() {
     return _optimizeDictionaryForMetrics;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index e884d8155f..f2f59a49ef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -97,6 +97,8 @@ public class TableConfigBuilder {
   private Map<String, String> _streamConfigs;
   private SegmentPartitionConfig _segmentPartitionConfig;
   private boolean _nullHandlingEnabled;
+  private boolean _continueOnError;
+  private boolean _validateTimeValue;
   private List<String> _varLengthDictionaryColumns;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private List<String> _jsonIndexColumns;
@@ -309,6 +311,16 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setContinueOnError(boolean continueOnError) {
+    _continueOnError = continueOnError;
+    return this;
+  }
+
+  public TableConfigBuilder setValidateTimeValue(boolean validateTimeValue) {
+    _validateTimeValue = validateTimeValue;
+    return this;
+  }
+
   public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
     _customConfig = customConfig;
     return this;
@@ -420,6 +432,8 @@ public class TableConfigBuilder {
     indexingConfig.setStreamConfigs(_streamConfigs);
     indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
     indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
+    indexingConfig.setContinueOnError(_continueOnError);
+    indexingConfig.setValidateTimeValue(_validateTimeValue);
     indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
     indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
     indexingConfig.setJsonIndexColumns(_jsonIndexColumns);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org