You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/11/24 02:58:22 UTC

(pinot) 01/06: Added new SpecialValueTransformer and tests.

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 95d4950dab166373ed9ec58555435852e5f2ffeb
Author: Aishik <ai...@startree.ai>
AuthorDate: Mon Nov 20 19:18:40 2023 +0530

    Added new SpecialValueTransformer and tests.
---
 .../recordtransformer/CompositeTransformer.java    | 11 ++-
 .../recordtransformer/SpecialValueTransformer.java | 96 ++++++++++++++++++++++
 .../recordtransformer/RecordTransformerTest.java   | 49 +++++++++++
 3 files changed, 153 insertions(+), 3 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index b6fb694306..50ca2a97c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -68,13 +68,18 @@ public class CompositeTransformer implements RecordTransformer {
    *     Optional {@link SanitizationTransformer} after {@link NullValueTransformer} so that before sanitation, all
    *     values are non-null and follow the data types defined in the schema
    *   </li>
+   *   <li>
+   *     {@link SpecialValueTransformer} after {@link DataTypeTransformer} so that we already have the values complying
+   *      with the schema before handling special values
+   *   </li>
    * </ul>
    */
   public static List<RecordTransformer> getDefaultTransformers(TableConfig tableConfig, Schema schema) {
     return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
-        new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
-        new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
-        new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()).collect(Collectors.toList());
+            new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
+            new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema),
+            new SpecialValueTransformer(schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp())
+        .collect(Collectors.toList());
   }
 
   public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java
new file mode 100644
index 0000000000..2bd5b9920d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import java.util.HashSet;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * The {@code SpecialValueTransformer} class will transform special values the values to follow certain rules including:
+ * <ul>
+ *   <li>Negative zero (-0.0) should be converted to 0.0</li>
+ *   <li>NaN should be converted to default null</li>
+ * </ul>
+ * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all values follow the data types in
+ * {@link FieldSpec}.
+ */
+public class SpecialValueTransformer implements RecordTransformer {
+  private final HashSet<String> _specialValuesKeySet = new HashSet<>();
+
+  public SpecialValueTransformer(Schema schema) {
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn() && (fieldSpec.getDataType() == DataType.FLOAT
+          || fieldSpec.getDataType() == DataType.DOUBLE)) {
+        _specialValuesKeySet.add(fieldSpec.getName());
+      }
+    }
+  }
+
+  private Object transformNegativeZero(Object value) {
+    if ((value instanceof Float) && (Float.floatToRawIntBits((float) value) == Float.floatToRawIntBits(-0.0f))) {
+      value = 0.0f;
+    } else if ((value instanceof Double) && (Double.doubleToLongBits((double) value) == Double.doubleToLongBits(
+        -0.0d))) {
+      value = 0.0d;
+    }
+    return value;
+  }
+
+  private Object transformNaN(Object value) {
+    if ((value instanceof Float) && ((Float) value).isNaN()) {
+      value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT;
+    } else if ((value instanceof Double) && ((Double) value).isNaN()) {
+      value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE;
+    }
+    return value;
+  }
+
+  @Override
+  public boolean isNoOp() {
+    return _specialValuesKeySet.isEmpty();
+  }
+
+  @Override
+  public GenericRow transform(GenericRow record) {
+    for (String element : _specialValuesKeySet) {
+      Object value = record.getValue(element);
+      if (value instanceof Float || value instanceof Double) {
+        // Single-valued column.
+        Object zeroTransformedValue = transformNegativeZero(value);
+        Object nanTransformedValue = transformNaN(zeroTransformedValue);
+        if (nanTransformedValue != value) {
+          record.putValue(element, nanTransformedValue);
+        }
+      } else if (value instanceof Object[]) {
+        // Multi-valued column.
+        Object[] values = (Object[]) value;
+        int numValues = values.length;
+        for (int i = 0; i < numValues; i++) {
+          values[i] = transformNegativeZero(values[i]);
+          values[i] = transformNaN(values[i]);
+        }
+      }
+    }
+    return record;
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index baded1b97a..e48b6605f5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -51,6 +51,13 @@ public class RecordTransformerTest {
       .addSingleValueDimension("svStringWithNullCharacters", DataType.STRING)
       .addSingleValueDimension("svStringWithLengthLimit", DataType.STRING)
       .addMultiValueDimension("mvString1", DataType.STRING).addMultiValueDimension("mvString2", DataType.STRING)
+      // For negative zero and NaN conversions
+      .addSingleValueDimension("svFloatNegativeZero", DataType.FLOAT)
+      .addMultiValueDimension("mvFloatNegativeZero", DataType.FLOAT)
+      .addSingleValueDimension("svDoubleNegativeZero", DataType.DOUBLE)
+      .addMultiValueDimension("mvDoubleNegativeZero", DataType.DOUBLE)
+      .addSingleValueDimension("svFloatNaN", DataType.FLOAT).addMultiValueDimension("mvFloatNaN", DataType.FLOAT)
+      .addSingleValueDimension("svDoubleNaN", DataType.DOUBLE).addMultiValueDimension("mvDoubleNaN", DataType.DOUBLE)
       .build();
   private static final TableConfig TABLE_CONFIG =
       new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
@@ -82,6 +89,14 @@ public class RecordTransformerTest {
     record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0});
     record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"});
     record.putValue("svNullString", null);
+    record.putValue("svFloatNegativeZero", -0.00f);
+    record.putValue("svDoubleNegativeZero", -0.00d);
+    record.putValue("mvFloatNegativeZero", new Float[]{-0.0f, 1.0f, 0.0f, 3.0f});
+    record.putValue("mvDoubleNegativeZero", new Double[]{-0.0d, 1.0d, 0.0d, 3.0d});
+    record.putValue("svFloatNaN", Float.NaN);
+    record.putValue("svDoubleNaN", Double.NaN);
+    record.putValue("mvFloatNaN", new Float[]{-0.0f, Float.NaN, 2.0f});
+    record.putValue("mvDoubleNaN", new Double[]{-0.0d, Double.NaN, 2.0d});
     return record;
   }
 
@@ -254,6 +269,28 @@ public class RecordTransformerTest {
     }
   }
 
+  @Test
+  public void testSpecialValueTransformer() {
+    RecordTransformer transformer = new SpecialValueTransformer(SCHEMA);
+    GenericRow record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")),
+          Float.floatToRawIntBits(0.0f));
+      assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")),
+          Double.doubleToRawLongBits(0.0d));
+      assertEquals(record.getValue("mvFloatNegativeZero"), new Float[]{0.0f, 1.0f, 0.0f, 3.0f});
+      assertEquals(record.getValue("mvDoubleNegativeZero"), new Double[]{0.0d, 1.0d, 0.0d, 3.0d});
+      assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
+      assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
+      assertEquals(record.getValue("mvFloatNaN"),
+          new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f});
+      assertEquals(record.getValue("mvDoubleNaN"),
+          new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d});
+    }
+  }
+
   @Test
   public void testScalarOps() {
     IngestionConfig ingestionConfig = new IngestionConfig();
@@ -530,6 +567,18 @@ public class RecordTransformerTest {
       assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", "123.0", "123.0", "123"});
       assertNull(record.getValue("$virtual"));
       assertTrue(record.getNullValueFields().isEmpty());
+      assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")),
+          Float.floatToRawIntBits(0.0f));
+      assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")),
+          Double.doubleToRawLongBits(0.0d));
+      assertEquals(record.getValue("mvFloatNegativeZero"), new Float[]{0.0f, 1.0f, 0.0f, 3.0f});
+      assertEquals(record.getValue("mvDoubleNegativeZero"), new Double[]{0.0d, 1.0d, 0.0d, 3.0d});
+      assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
+      assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
+      assertEquals(record.getValue("mvFloatNaN"),
+          new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f});
+      assertEquals(record.getValue("mvDoubleNaN"),
+          new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d});
     }
 
     // Test empty record


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org