You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/09/22 17:16:38 UTC

[GitHub] [incubator-pinot] timsants opened a new pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

timsants opened a new pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046


   ## Description
   1. PR for issue [#5507](https://github.com/apache/incubator-pinot/issues/5507). ORC, Thrift, and ProtoBuf readers now convert:
       - Nested structures to Map
       - Collection to Object[]
       - Number/String/bytebuffer to single value
   2. All extractors now support extracting all fields if fieldsToRead is null/empty (issue [#5677](https://github.com/apache/incubator-pinot/issues/5677)). This support was
      added to ORCRecordExtractor, ThriftRecordExtractor, ProtoBufRecordExtractor, and CSVRecordRecord.
   3. Extractor Util Cleanup:
          There were duplicate implementations for extractor converters across RecordReaderUtils, JsonRecordExtractorUtils,
          and AvroUtils. This PR adds a new method, `Object convert(Object value)`, to the RecordExtractor interface, as
          this is a method that all extractors should implement to convert each field of the file format. A new abstract
          class was created that extends RecordExtractor to contain the repeated logic across RecordReaderUtils,
          JsonRecordExtractorUtils, and AvroUtils. The abstract class also defines the common methods for recursively
          handling maps, collections, records and single values.
   
   ## Release Notes
   **ORC Records**
   
   Before this PR:
   - All single value ORC types were converted to number/string/byte[]
   - List type as Object[]
   - Map type as Map<Object, Object>
   - There was no case for handling ORC struct types. An IllegalArgumentException would have been thrown if a struct type field was present.
   - Only 1 level of nesting was handled in Map and Array.
   
   After this PR:
   - All single value ORC types were converted to number/string/byte[]
   - List type as Object[]
   - Map type as Map<Object, Object>
   - ORC struct type as Map<Object, Object>
   - Nested extraction is supported for List, Map and Struct types. Only nested Map values are supported (keys are handled as a single value).
   
   **Thrift Records**
   
   Before this PR:
   - All single value Thrift types were converted to number/string/byte[]
   - List types as Object[] with only 1 level of nesting
   - Maps or Thrift structs were converted by calling `.toString()` on it and as a result, would not preserve nested object structures.
   - Prior to this change, it was assumed that each field ID in the Thrift record was consecutive, but this assumption is not enforced by Thrift compiler.
   
   After this PR:
   - All single value Thrift types are converted to number/string/byte[]
   - List types as Object[]
   - Map as Map<Object, Object>
   - TBase type (Thrift struct) as Map<Object, Object>
   - Nested extraction is supported for List, Map and Struct types. Only nested Map values are supported (keys are handled as a single value).
   - The initialization of fields is modified such that the field IDs are taken from the structMetataMap of the Thrift Object therefore field IDs do not need to be consecutive.
   
   **ProtoBuf Records**
   
   Before this PR:
   - All single value ProtoBuf types were converted to number/string/byte[]
   - Repeated type (array) as Object[] with only 1 level of nesting
   - Map types were incorrectly handled as a collection and ProtoBuf Messages were converted by calling `.toString()` on it and as a result, would not preserve nested object structures.
   
   After this PR:
   - All single value ProtoBuf types are converted to number/string/byte[]
   - Repeated type (array) as Object[]
   - Map as Map<Object, Object>
   - ProtoBuf nested messages as Map<String, Object>
   - Nested extraction is supported for List, Map and Struct types. Only nested Map values are supported (keys are handled as a single value).
   
   **Backwards incompatibility**
   With the new extraction support of nested fields/complex objects, if a Thrift, ProtoBuf or ORC file contained fields with Maps/Collection with complex objects, those objects will be now retained, instead of converting them using `.toString()`. Therefore, any client expecting the old treatment of nested fields will be impacted.
   
   In addition, if the `fieldsToRead` param is ever null/empty for the RecordReader, all fields of the record will now be read. Prior to this change, no field would have been read by the RecordReader.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502626114



##########
File path: pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
##########
@@ -61,12 +62,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    int index = 1;
-    TFieldIdEnum tFieldIdEnum;
-    while ((tFieldIdEnum = tObject.fieldForId(index)) != null) {
-      _fieldIds.put(tFieldIdEnum.getFieldName(), index);
-      index++;
+

Review comment:
       I didn't follow this change, could you explain what's happening here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r504138810



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       I prefer the alternate implementation. Reason being, the RecordExtractor is a public API. They could be used in external applications. The external users should not have to know that all except CSVRecordExtractor expect fieldsToRead upfront.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r506052351



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   *
+   * @param value should be verified to be a record type prior to calling this method as it will be handled with this
+   *              assumption
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   *
+   * @param value should be verified to be a Collection type prior to calling this method as it will be casted
+   *              to a Collection without checking
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   *
+   * @param value should be verified to be a Map type prior to calling this method as it will be casted to a Map
+   *              without checking
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map<Object, Object> map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object mapKey = entry.getKey();
+      Object mapValue = entry.getValue();
+      if (mapKey != null) {
+        Object convertedMapValue = null;
+        if (mapValue != null) {
+          convertedMapValue = convert(mapValue);
+        }
+
+        if (convertedMapValue != null) {
+          convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+        }
+      }
+    }
+
+    if (convertedMap.isEmpty()) {
+      return null;
+    }
+
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values.
+   */
+  protected Object convertSingleValue(Object value) {
+    if (value instanceof ByteBuffer) {
+      ByteBuffer byteBufferValue = (ByteBuffer) value;
+
+      // Use byteBufferValue.remaining() instead of byteBufferValue.capacity() so that it still works when buffer is
+      // over-sized
+      byte[] bytesValue = new byte[byteBufferValue.remaining()];
+      byteBufferValue.get(bytesValue);
+      return bytesValue;
+    }
+    if (value instanceof Number) {

Review comment:
       I'll make this change although I don't see any data formats that use this method returning byte[].




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495613899



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -27,8 +28,9 @@
  * 2) Collections become Object[] i.e. multi-value column
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
+ * @param <V> The input record's field to be converted

Review comment:
       Makes sense. Will remove the generic type `V`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#issuecomment-709805252


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=h1) Report
   > Merging [#6046](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `21.14%`.
   > The diff coverage is `48.84%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6046/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6046       +/-   ##
   ===========================================
   - Coverage   66.44%   45.30%   -21.15%     
   ===========================================
     Files        1075     1232      +157     
     Lines       54773    58161     +3388     
     Branches     8168     8604      +436     
   ===========================================
   - Hits        36396    26352    -10044     
   - Misses      15700    29643    +13943     
   + Partials     2677     2166      -511     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integration | `45.30% <48.84%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `26.66% <0.00%> (-30.48%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `22.22% <0.00%> (-26.62%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `24.00% <0.00%> (-10.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `64.28% <ø> (-8.89%)` | :arrow_down: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | ... and [1213 more](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=footer). Last update [6275818...bd66189](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r506046635



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   *
+   * @param value should be verified to be a record type prior to calling this method as it will be handled with this
+   *              assumption
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   *
+   * @param value should be verified to be a Collection type prior to calling this method as it will be casted
+   *              to a Collection without checking
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {

Review comment:
       I'll mention this nuance in the method's javadoc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r497972010



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       That's a good point regarding performance. I can move the null handling before calling the method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495613434



##########
File path: pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
##########
@@ -45,18 +45,32 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, convert(value)));

Review comment:
       Sure I can do that. But do you have any resources showing evidence for this? I thought the underlying implementation for `.forEach` would be similar to `for (T t : iterable)` given that it is not performed on a java `.stream()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495613455



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);

Review comment:
       Sounds good. This would help simplify the JSON extractor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503039737



##########
File path: pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
##########
@@ -49,13 +51,53 @@ public GenericRow extract(GenericRecord from, GenericRow to) {
       List<Schema.Field> fields = from.getSchema().getFields();
       for (Schema.Field field : fields) {
         String fieldName = field.name();
-        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+        Object value = from.get(fieldName);
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
       }
     } else {
       for (String fieldName : _fields) {
-        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+        Object value = from.get(fieldName);
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
       }
     }
     return to;
   }
+
+  /**
+   * Returns whether the object is an Avro GenericRecord.
+   */
+  @Override
+  protected boolean isInstanceOfRecord(Object value) {
+    return value instanceof GenericRecord;
+  }
+
+  /**
+   * Handles the conversion of every field of the Avro GenericRecord.
+   */
+  @Override
+  @Nullable
+  protected Object convertRecord(Object value) {

Review comment:
       That's a good idea. Adding a description to these javadocs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503031500



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       I had the same thought and was debating whether or not to follow the same pattern as the other extractors. I eventually decided to put the "read all fields" in the CSVRecordReader because the field names are accessible only through the CSV header and not in the record object being passed to the `extract` method.
   
   The alternate implementation I was thinking of would require that all the CSV column names would be set in a new variable within `CSVRecordExtractorConfig`. But if most of the time,`fieldsToRead` is being set, then there would be a duplicated unused `Set` of field names that will be sent to the `CSVRecordExtractor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r505918629



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   *
+   * @param value should be verified to be a record type prior to calling this method as it will be handled with this
+   *              assumption
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   *
+   * @param value should be verified to be a Collection type prior to calling this method as it will be casted
+   *              to a Collection without checking
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {

Review comment:
       Shall we add some comments here about the behavior of the empty string?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   *
+   * @param value should be verified to be a record type prior to calling this method as it will be handled with this
+   *              assumption
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   *
+   * @param value should be verified to be a Collection type prior to calling this method as it will be casted
+   *              to a Collection without checking
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   *
+   * @param value should be verified to be a Map type prior to calling this method as it will be casted to a Map
+   *              without checking
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map<Object, Object> map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object mapKey = entry.getKey();
+      Object mapValue = entry.getValue();
+      if (mapKey != null) {
+        Object convertedMapValue = null;
+        if (mapValue != null) {
+          convertedMapValue = convert(mapValue);
+        }
+
+        if (convertedMapValue != null) {
+          convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+        }
+      }
+    }
+
+    if (convertedMap.isEmpty()) {
+      return null;
+    }
+
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values.
+   */
+  protected Object convertSingleValue(Object value) {
+    if (value instanceof ByteBuffer) {
+      ByteBuffer byteBufferValue = (ByteBuffer) value;
+
+      // Use byteBufferValue.remaining() instead of byteBufferValue.capacity() so that it still works when buffer is
+      // over-sized
+      byte[] bytesValue = new byte[byteBufferValue.remaining()];
+      byteBufferValue.get(bytesValue);
+      return bytesValue;
+    }
+    if (value instanceof Number) {

Review comment:
       We should also preserve byte[]
   ```suggestion
       if (value instanceof Number || value instanceof byte[]) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar merged pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar merged pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r499902967



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;

Review comment:
       Suggest renaming some methods:
   ```suggestion
       if (isMultiValue(value)) {
         return convertMultiValue(value);
       } else if (isMap(value)) {
         return convertMap(value);
       } else if (isRecord(value)) {
         return convertRecord(value);
       } else {
         return convertSingleValue(value);
       }
   ```

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {

Review comment:
       Please double-check the current behavior of handling empty string. I think we should include them into the MV array

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {

Review comment:
       Use `map.entrySet()` to avoid the unnecessary lookups

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;

Review comment:
       Return `null` for empty map?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       `value` should never be `null` here?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);

Review comment:
       We don't allow either key or value as `null` inside the map because we don't allow `null` inside the MV and map is handled with MV columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r496140943



##########
File path: pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
##########
@@ -45,18 +45,32 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, convert(value)));

Review comment:
       I see. Thanks for the background info. I'll change the extractors such that they do not use the lambda expression then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r496310134



##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       You can still use the method you added such as `isInstanceOfRecord` here. So in the method:
   ```
     protected boolean isInstanceOfRecord(Object value) {
       return (TypeDescription.Category) value == TypeDescription.Category.STRUCT;
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r494744256



##########
File path: pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
##########
@@ -45,18 +45,32 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, convert(value)));

Review comment:
       It'd be good to avoid using the non-functional way for performance concern. Please refer to the way that `AvroRecordExtractor` uses. Same to the other Extractors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495607739



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).

Review comment:
       Yes I agree. The only use of the generic type `V` is for the Object conversion but this can easily be handled within the `convertRecord` implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503028177



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       Yes, that is now correct with the changes to map conversion.

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       Yes, that is now true with the changes to map conversion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503029865



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
##########
@@ -69,4 +70,32 @@ public GenericRow extract(CSVRecord from, GenericRow to) {
     }
     return to;
   }
+
+  @Override
+  @Nullable
+  public Object convert(@Nullable Object value) {

Review comment:
       You're right, good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#issuecomment-709805252


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=h1) Report
   > Merging [#6046](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **increase** coverage by `6.41%`.
   > The diff coverage is `59.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6046/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6046      +/-   ##
   ==========================================
   + Coverage   66.44%   72.85%   +6.41%     
   ==========================================
     Files        1075     1232     +157     
     Lines       54773    58161    +3388     
     Branches     8168     8604     +436     
   ==========================================
   + Hits        36396    42376    +5980     
   + Misses      15700    12977    -2723     
   - Partials     2677     2808     +131     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integration | `45.30% <48.84%> (?)` | |
   | #unittests | `64.02% <38.03%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `44.44% <0.00%> (-4.40%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `24.00% <0.00%> (-10.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `78.57% <ø> (+5.40%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `98.27% <ø> (-1.73%)` | :arrow_down: |
   | [.../pinot/common/function/DateTimePatternHandler.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vRGF0ZVRpbWVQYXR0ZXJuSGFuZGxlci5qYXZh) | `83.33% <ø> (ø)` | |
   | [...ot/common/function/FunctionDefinitionRegistry.java](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vRnVuY3Rpb25EZWZpbml0aW9uUmVnaXN0cnkuamF2YQ==) | `88.88% <ø> (+44.44%)` | :arrow_up: |
   | ... and [988 more](https://codecov.io/gh/apache/incubator-pinot/pull/6046/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=footer). Last update [6275818...bd66189](https://codecov.io/gh/apache/incubator-pinot/pull/6046?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503035106



##########
File path: pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
##########
@@ -61,12 +62,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    int index = 1;
-    TFieldIdEnum tFieldIdEnum;
-    while ((tFieldIdEnum = tObject.fieldForId(index)) != null) {
-      _fieldIds.put(tFieldIdEnum.getFieldName(), index);
-      index++;
+

Review comment:
       As discussed over Slack, there was a bug in how we were parsing Thrift fields. Thrift field IDs are not guaranteed to be consecutive, which was previous assumed. If a Thrift record did not have consecutive field IDs, the extractor would break/incorrectly extract values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r494744256



##########
File path: pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
##########
@@ -45,18 +45,32 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, convert(value)));

Review comment:
       It'd be good to avoid using the non-functional way for performance concern. Please refer to the way that `AvroRecordExtractor` uses. Same to the other Extractors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503031500



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       I had the same thought and was debating whether or not to follow the same pattern as the other extractors. I eventually decided to put the "read all fields" in the CSVRecordReader because the field names are accessible only through the CSV header and not in the record object being passed to the `extract` method.
   
   The alternate implementation I was thinking of would require that all the CSV column names would be set in a new variable within `CSVRecordExtractorConfig`. But if most of the time,`fieldsToRead` is being set, then it would be a duplicated unused `Set` of field names that will be sent to the `CSVRecordExtractor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r493140341



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -27,8 +28,9 @@
  * 2) Collections become Object[] i.e. multi-value column
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
+ * @param <V> The input record's field to be converted

Review comment:
       Recommend not adding this generic type `V` as in most cases it is `Object` (the field can be of lots of types for the same file format)

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -27,8 +28,9 @@
  * 2) Collections become Object[] i.e. multi-value column
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
+ * @param <V> The input record's field to be converted
  */
-public interface RecordExtractor<T> {
+public interface RecordExtractor<T, V> {
 
   /**
    * Initialize the record extractor with its config

Review comment:
       Put nullable annotation before `fields`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {

Review comment:
       Suggest renaming it to `BaseRecordExtractor`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format.
+   */
+  @Nullable
+  protected abstract Object convertRecord(V value);
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = convert(element);
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      convertedMap.put(convertSingleValue(key), convert(map.get(key)));
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       The argument will never be null

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value

Review comment:
       We should return `byte[]` instead of `ByteBuffer`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       We might not want to pass `null` into the `convert()`. Check the value before calling `convert()`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       Are we returning `null` for empty array/collection/map? If so, let's add the behavior to the javadoc and annotate the return value as nullable

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).

Review comment:
       Not sure how much value this generic type `V` can provide. IMO `convertRecord(Object value)` should be good enough (similar to `convertMap(Object value)` etc.)

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);

Review comment:
       Return `false` for default implementation?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format.
+   */
+  @Nullable
+  protected abstract Object convertRecord(V value);

Review comment:
       Throw `UnsupportedOperationException` for default implementation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495606046



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {

Review comment:
       Good suggestion. Renamed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495614409



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       I prefer handling `null` values in one place if we want consistent handling across all the extractors. But if you think that this treatment of `null` could differ across different data formats/types, we can put the `null` handling before calling `convert()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495619489



##########
File path: pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
##########
@@ -45,18 +45,32 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, convert(value)));

Review comment:
       We had an internal meeting to discuss about the performance between functional and procedural method a year ago.
   Here's a doc on the performance of using functional method:
   https://www.beyondjava.net/performance-java-8-lambdas




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502567100



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {

Review comment:
       I believe this behavior is in line with the current. I checked AvroUtils, JsonRecordExtractorUtils and RecordReaderUtils, and they're all doing this same thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r493140341



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -27,8 +28,9 @@
  * 2) Collections become Object[] i.e. multi-value column
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
+ * @param <V> The input record's field to be converted

Review comment:
       Recommend not adding this generic type `V` as in most cases it is `Object` (the field can be of lots of types for the same file format)

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -27,8 +28,9 @@
  * 2) Collections become Object[] i.e. multi-value column
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
+ * @param <V> The input record's field to be converted
  */
-public interface RecordExtractor<T> {
+public interface RecordExtractor<T, V> {
 
   /**
    * Initialize the record extractor with its config

Review comment:
       Put nullable annotation before `fields`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {

Review comment:
       Suggest renaming it to `BaseRecordExtractor`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format.
+   */
+  @Nullable
+  protected abstract Object convertRecord(V value);
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = convert(element);
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      convertedMap.put(convertSingleValue(key), convert(map.get(key)));
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       The argument will never be null

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value

Review comment:
       We should return `byte[]` instead of `ByteBuffer`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       We might not want to pass `null` into the `convert()`. Check the value before calling `convert()`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       Are we returning `null` for empty array/collection/map? If so, let's add the behavior to the javadoc and annotate the return value as nullable

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).

Review comment:
       Not sure how much value this generic type `V` can provide. IMO `convertRecord(Object value)` should be good enough (similar to `convertMap(Object value)` etc.)

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);

Review comment:
       Return `false` for default implementation?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format.
+   */
+  @Nullable
+  protected abstract Object convertRecord(V value);

Review comment:
       Throw `UnsupportedOperationException` for default implementation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503034028



##########
File path: pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
##########
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.thrift;
+
+import com.google.common.collect.Sets;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+
+/**
+ * Tests for the {@link ThriftRecordExtractor}
+ */
+public class ThriftRecordExtractorTest extends AbstractRecordExtractorTest {
+
+  private File _tempFile = new File(_tempDir, "test_complex_thrift.data");
+
+  private static final String INT_FIELD = "intField";
+  private static final String LONG_FIELD = "longField";
+  private static final String BOOL_FIELD = "booleanField";
+  private static final String DOUBLE_FIELD = "doubleField";
+  private static final String STRING_FIELD = "stringField";
+  private static final String ENUM_FIELD = "enumField";
+  private static final String OPTIONAL_STRING_FIELD = "optionalStringField";
+  private static final String NESTED_STRUCT_FIELD = "nestedStructField";
+  private static final String SIMPLE_LIST = "simpleListField";
+  private static final String COMPLEX_LIST = "complexListField";
+  private static final String SIMPLE_MAP = "simpleMapField";
+  private static final String COMPLEX_MAP = "complexMapField";
+  private static final String NESTED_STRING_FIELD = "nestedStringField";
+  private static final String NESTED_INT_FIELD = "nestedIntField";
+
+  @Override
+  protected List<Map<String, Object>> getInputRecords() {
+    return Arrays.asList(createRecord1(), createRecord2());
+  }
+
+  @Override
+  protected Set<String> getSourceFields() {
+    return Sets.newHashSet(INT_FIELD, LONG_FIELD, BOOL_FIELD, DOUBLE_FIELD, STRING_FIELD, ENUM_FIELD,
+        OPTIONAL_STRING_FIELD, NESTED_STRUCT_FIELD, SIMPLE_LIST, COMPLEX_LIST, SIMPLE_MAP, COMPLEX_MAP);
+  }
+
+  /**
+   * Creates a ThriftRecordReader
+   */
+  @Override
+  protected RecordReader createRecordReader(Set<String> fieldsToRead)
+      throws IOException {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
+    return recordReader;
+  }
+
+  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
+    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
+    config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ComplexTypes");
+    return config;
+  }
+
+  /**
+   * Create a data input file using input records containing various Thrift record types
+   */
+  @Override
+  protected void createInputFile()
+      throws IOException {
+    List<ComplexTypes> thriftRecords = new ArrayList<>(2);
+
+    for (Map<String, Object> inputRecord : _inputRecords) {
+      ComplexTypes thriftRecord = new ComplexTypes();
+      thriftRecord.setIntField((int) inputRecord.get(INT_FIELD));
+      thriftRecord.setLongField((long) inputRecord.get(LONG_FIELD));
+
+      Map<String, Object> nestedStructValues = (Map<String, Object>) inputRecord.get(NESTED_STRUCT_FIELD);
+      thriftRecord.setNestedStructField(createNestedType(
+          (String) nestedStructValues.get(NESTED_STRING_FIELD),
+          (int) nestedStructValues.get(NESTED_INT_FIELD))
+      );
+
+      thriftRecord.setSimpleListField((List<String>) inputRecord.get(SIMPLE_LIST));
+
+      List<NestedType> nestedTypeList = new ArrayList<>();
+      for (Map element : (List<Map>) inputRecord.get(COMPLEX_LIST)) {
+        nestedTypeList.add(createNestedType((String) element.get(NESTED_STRING_FIELD),
+            (Integer) element.get(NESTED_INT_FIELD)));
+      }
+
+      thriftRecord.setComplexListField(nestedTypeList);
+      thriftRecord.setBooleanField(Boolean.valueOf((String) inputRecord.get(BOOL_FIELD)));
+      thriftRecord.setDoubleField((Double) inputRecord.get(DOUBLE_FIELD));
+      thriftRecord.setStringField((String) inputRecord.get(STRING_FIELD));
+      thriftRecord.setEnumField(TestEnum.valueOf((String) inputRecord.get(ENUM_FIELD)));
+      thriftRecord.setSimpleMapField((Map<String, Integer>) inputRecord.get(SIMPLE_MAP));
+
+      Map<String, NestedType> complexMap = new HashMap<>();
+      for (Map.Entry<String, Map<String, Object>> entry :
+          ((Map<String, Map<String, Object>>) inputRecord.get(COMPLEX_MAP)).entrySet()) {
+        complexMap.put(entry.getKey(), createNestedType(
+            (String) entry.getValue().get(NESTED_STRING_FIELD),
+            (int) entry.getValue().get(NESTED_INT_FIELD)));
+      }
+      thriftRecord.setComplexMapField(complexMap);
+      thriftRecords.add(thriftRecord);
+    }
+
+    BufferedOutputStream bufferedOut = new BufferedOutputStream(new FileOutputStream(_tempFile));
+    TBinaryProtocol binaryOut = new TBinaryProtocol(new TIOStreamTransport(bufferedOut));
+    for (ComplexTypes record : thriftRecords) {
+      try {
+        record.write(binaryOut);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+    bufferedOut.close();
+  }
+
+  private Map<String, Object> createRecord1() {
+    Map<String, Object> record = new HashMap<>();
+    record.put(STRING_FIELD, "hello");
+    record.put(INT_FIELD, 10);
+    record.put(LONG_FIELD, 1000L);
+    record.put(DOUBLE_FIELD, 1.0);
+    record.put(BOOL_FIELD, "false");
+    record.put(ENUM_FIELD, TestEnum.DELTA.toString());
+    record.put(NESTED_STRUCT_FIELD, createNestedMap(NESTED_STRING_FIELD, "ice cream", NESTED_INT_FIELD, 5));
+    record.put(SIMPLE_LIST, Arrays.asList("aaa", "bbb", "ccc"));
+    record.put(COMPLEX_LIST,
+        Arrays.asList(
+            createNestedMap(NESTED_STRING_FIELD, "hows", NESTED_INT_FIELD, 10),
+            createNestedMap(NESTED_STRING_FIELD, "it", NESTED_INT_FIELD, 20),
+            createNestedMap(NESTED_STRING_FIELD, "going", NESTED_INT_FIELD, 30)
+        )
+    );
+    record.put(SIMPLE_MAP, createNestedMap("Tuesday", 3, "Wednesday", 4));
+    record.put(
+        COMPLEX_MAP,
+        createNestedMap(
+            "fruit1", createNestedMap(NESTED_STRING_FIELD, "apple", NESTED_INT_FIELD, 1),
+            "fruit2", createNestedMap(NESTED_STRING_FIELD, "orange", NESTED_INT_FIELD, 2)
+        )
+    );
+    return record;
+  }
+
+  private Map<String, Object> createRecord2() {
+    Map<String, Object> record = new HashMap<>();
+    record.put(STRING_FIELD, "world");
+    record.put(INT_FIELD, 20);
+    record.put(LONG_FIELD, 2000L);
+    record.put(DOUBLE_FIELD, 2.0);
+    record.put(BOOL_FIELD, "false");
+    record.put(ENUM_FIELD, TestEnum.GAMMA.toString());
+    record.put(NESTED_STRUCT_FIELD, createNestedMap(NESTED_STRING_FIELD, "ice cream", NESTED_INT_FIELD, 5));
+    record.put(SIMPLE_LIST, Arrays.asList("aaa", "bbb", "ccc"));
+    record.put(COMPLEX_LIST,
+        Arrays.asList(
+            createNestedMap(NESTED_STRING_FIELD, "hows", NESTED_INT_FIELD, 10),
+            createNestedMap(NESTED_STRING_FIELD, "it", NESTED_INT_FIELD, 20),
+            createNestedMap(NESTED_STRING_FIELD, "going", NESTED_INT_FIELD, 30)
+        )
+    );
+    record.put(SIMPLE_MAP, createNestedMap("Tuesday", 3, "Wednesday", 4));
+    record.put(
+        COMPLEX_MAP,
+        createNestedMap(
+            "fruit1", createNestedMap(NESTED_STRING_FIELD, "apple", NESTED_INT_FIELD, 1),
+            "fruit2", createNestedMap(NESTED_STRING_FIELD, "orange", NESTED_INT_FIELD, 2)
+        )
+    );
+    return record;
+  }
+
+  private Map<String, Object> createNestedMap(String key1, Object value1, String key2, Object value2) {
+    Map<String, Object> nestedMap = new HashMap<>(2);
+    nestedMap.put(key1, value1);
+    nestedMap.put(key2, value2);
+    return nestedMap;
+  }
+
+  private NestedType createNestedType(String stringField, int intField) {
+    NestedType nestedRecord = new NestedType();
+    nestedRecord.setNestedStringField(stringField);
+    nestedRecord.setNestedIntField(intField);
+    return nestedRecord;
+  }
+
+  @Override
+  protected boolean testExtractAll() {

Review comment:
       Good call. We can remove this method now and testing the `extract all` code path will be part of all the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502595891



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       i see that in CSV case where fieldsToRead is null, we get fields to read in the RecordReader. Versus, in the JSON/AVRO case we used to make that decision inside the RecordExtractor.
   Is it possible to keep these consistent and always let the RecordExtractor make this decision?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503031500



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       I had the same thought and was debating whether or not to follow the same pattern as the other extractors. I eventually decided to put the "read all fields" in the CSVRecordReader because the field names are accessible only through the CSV header and not in the record object being passed to the `extract` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r496310227



##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {
+      TypeDescription childType = fieldType.getChildren().get(0);
+      ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+      if (columnVector.isRepeating) {
+        rowId = 0;
+      }
+      if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
+        int offset = (int) listColumnVector.offsets[rowId];
+        int length = (int) listColumnVector.lengths[rowId];
+        List<Object> values = new ArrayList<>(length);
+        for (int j = 0; j < length; j++) {
+          Object value = extractValue(field, listColumnVector.child, childType,offset + j);
+          // NOTE: Only keep non-null values
+          if (value != null) {
+            values.add(value);
           }
-          reuse.putValue(field, map);
+        }
+        if (!values.isEmpty()) {
+          return values.toArray();
         } else {
-          reuse.putValue(field, null);
+          // NOTE: Treat empty list as null
+          return null;
         }
       } else {
-        // Single-value field
-        reuse.putValue(field, extractSingleValue(field, _rowBatch.cols[i], _nextRowId, category));
+        return null;
       }
-    }
+    } else if (category == TypeDescription.Category.MAP) {

Review comment:
       Same here.

##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {

Review comment:
       Same here.

##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -72,7 +73,7 @@
   private int _nextRowId;
 
   @Override
-  public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)

Review comment:
       Can we have an extractor for ORC as the other RecordReaders do?

##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       You can still use the method you added such as `isInstanceOfRecord` here. So the in the method:
   ```
     protected boolean isInstanceOfRecord(Object value) {
       return (TypeDescription.Category) value == TypeDescription.Category.STRUCT;
     }
   ```

##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       And pls adjust the sequence to make it consistent in all extractors (first check collection, then map, then record, and finally single value).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502576251



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       Looks like you have put `if (value != null)` almost everywhere before calling convert, so this indeed won't be null.
   However, I prefer the way it used to be - no null checks in every single RecordExtractor, and instead handling null here, so we have lesser of the `if not null` lines.
   But I'll leave it up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r495615360



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/AbstractDefaultRecordExtractor.java
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ * @param <V> value used for converting the nested/complex fields of the file format (e.g. GenericRecord for Avro).
+ *            In most cases, this will be the same type as {@code T}.
+ */
+public abstract class AbstractDefaultRecordExtractor<T, V> implements RecordExtractor<T, Object> {
+
+  /**
+   * Converts the field value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map.
+   * Returns {@code null} if the field value is {@code null}.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord((V) value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type.
+   */
+  protected abstract boolean isInstanceOfRecord(Object value);
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format.
+   */
+  @Nullable
+  protected abstract Object convertRecord(V value);
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = convert(element);
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      convertedMap.put(convertSingleValue(key), convert(map.get(key)));
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       This method is called for converting map keys without checking for `null`. I believe it is possible to have a `null` map key but do we want to accept this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503027842



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;

Review comment:
       Yes I'll add that. It will be consistent with the handling of multi-values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r504411649



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
##########
@@ -95,8 +95,13 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+
     init();
+
+    if (fieldsToRead == null || fieldsToRead.isEmpty()) {

Review comment:
       After syncing over Slack, we decided to keep the decision to extract all fields within the extractor in case any other client directly calls the extractor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r506045817



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   *
+   * @param value should be verified to be a record type prior to calling this method as it will be handled with this
+   *              assumption
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   *
+   * @param value should be verified to be a Collection type prior to calling this method as it will be casted
+   *              to a Collection without checking
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   *
+   * @param value should be verified to be a Map type prior to calling this method as it will be casted to a Map
+   *              without checking
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map<Object, Object> map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object mapKey = entry.getKey();
+      Object mapValue = entry.getValue();
+      if (mapKey != null) {
+        Object convertedMapValue = null;
+        if (mapValue != null) {
+          convertedMapValue = convert(mapValue);
+        }
+
+        if (convertedMapValue != null) {
+          convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+        }
+      }
+    }
+
+    if (convertedMap.isEmpty()) {
+      return null;
+    }
+
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values.
+   */
+  protected Object convertSingleValue(Object value) {
+    if (value instanceof ByteBuffer) {
+      ByteBuffer byteBufferValue = (ByteBuffer) value;
+
+      // Use byteBufferValue.remaining() instead of byteBufferValue.capacity() so that it still works when buffer is
+      // over-sized
+      byte[] bytesValue = new byte[byteBufferValue.remaining()];
+      byteBufferValue.get(bytesValue);
+      return bytesValue;
+    }
+    if (value instanceof Number) {

Review comment:
       Yea that makes sense given that bytebuffer is converted to byte[].




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r504422005



##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       Resolving since ORC extraction is different from the other extractors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502588825



##########
File path: pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
##########
@@ -49,13 +51,53 @@ public GenericRow extract(GenericRecord from, GenericRow to) {
       List<Schema.Field> fields = from.getSchema().getFields();
       for (Schema.Field field : fields) {
         String fieldName = field.name();
-        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+        Object value = from.get(fieldName);
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
       }
     } else {
       for (String fieldName : _fields) {
-        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+        Object value = from.get(fieldName);
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
       }
     }
     return to;
   }
+
+  /**
+   * Returns whether the object is an Avro GenericRecord.
+   */
+  @Override
+  protected boolean isInstanceOfRecord(Object value) {
+    return value instanceof GenericRecord;
+  }
+
+  /**
+   * Handles the conversion of every field of the Avro GenericRecord.
+   */
+  @Override
+  @Nullable
+  protected Object convertRecord(Object value) {

Review comment:
       For this method (and similarly all the ones in BaseRecordExtractor), please add a javadoc for value.
   Previously we used to cast upfront and the param was directly the type (Collection, GenericRecord etc). But now, we are expecting the right type be provided in value, and casting here without any check. So it would be nice to have a description for the value for all the methods of this nature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r503027108



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);

Review comment:
       Makes sense. I'll add `null` checks for the map keys and values so that they are not inserted into the map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r496431862



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
##########
@@ -46,4 +48,16 @@
    * @return The output GenericRow
    */
   GenericRow extract(T from, GenericRow to);
+
+  /**
+   * Converts a field of the given input record. The field value will be converted to either a single value
+   * (string, number, bytebuffer), multi value (Object[]) or a Map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   *
+   * @param value the field value to be converted
+   * @return The converted field value
+   */
+  Object convert(@Nullable V value);

Review comment:
       I prefer handling `null` explicitly before calling the method for readability and slightly better performance (save a method call), while this is personal preference so either way is fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502591789



##########
File path: pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
##########
@@ -69,4 +70,32 @@ public GenericRow extract(CSVRecord from, GenericRow to) {
     }
     return to;
   }
+
+  @Override
+  @Nullable
+  public Object convert(@Nullable Object value) {

Review comment:
       I think you missed removing this code from `extract` and also calling `convert`  above in the `extract`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants closed pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants closed pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#issuecomment-708178631


   > shouldn't `CSVRecordExtractor` also extend `BaseRecordExtractor` abstract class instead of implementing the `RecordExtractor` interface?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502624766



##########
File path: pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
##########
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.thrift;
+
+import com.google.common.collect.Sets;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+
+/**
+ * Tests for the {@link ThriftRecordExtractor}
+ */
+public class ThriftRecordExtractorTest extends AbstractRecordExtractorTest {
+
+  private File _tempFile = new File(_tempDir, "test_complex_thrift.data");
+
+  private static final String INT_FIELD = "intField";
+  private static final String LONG_FIELD = "longField";
+  private static final String BOOL_FIELD = "booleanField";
+  private static final String DOUBLE_FIELD = "doubleField";
+  private static final String STRING_FIELD = "stringField";
+  private static final String ENUM_FIELD = "enumField";
+  private static final String OPTIONAL_STRING_FIELD = "optionalStringField";
+  private static final String NESTED_STRUCT_FIELD = "nestedStructField";
+  private static final String SIMPLE_LIST = "simpleListField";
+  private static final String COMPLEX_LIST = "complexListField";
+  private static final String SIMPLE_MAP = "simpleMapField";
+  private static final String COMPLEX_MAP = "complexMapField";
+  private static final String NESTED_STRING_FIELD = "nestedStringField";
+  private static final String NESTED_INT_FIELD = "nestedIntField";
+
+  @Override
+  protected List<Map<String, Object>> getInputRecords() {
+    return Arrays.asList(createRecord1(), createRecord2());
+  }
+
+  @Override
+  protected Set<String> getSourceFields() {
+    return Sets.newHashSet(INT_FIELD, LONG_FIELD, BOOL_FIELD, DOUBLE_FIELD, STRING_FIELD, ENUM_FIELD,
+        OPTIONAL_STRING_FIELD, NESTED_STRUCT_FIELD, SIMPLE_LIST, COMPLEX_LIST, SIMPLE_MAP, COMPLEX_MAP);
+  }
+
+  /**
+   * Creates a ThriftRecordReader
+   */
+  @Override
+  protected RecordReader createRecordReader(Set<String> fieldsToRead)
+      throws IOException {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
+    return recordReader;
+  }
+
+  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
+    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
+    config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ComplexTypes");
+    return config;
+  }
+
+  /**
+   * Create a data input file using input records containing various Thrift record types
+   */
+  @Override
+  protected void createInputFile()
+      throws IOException {
+    List<ComplexTypes> thriftRecords = new ArrayList<>(2);
+
+    for (Map<String, Object> inputRecord : _inputRecords) {
+      ComplexTypes thriftRecord = new ComplexTypes();
+      thriftRecord.setIntField((int) inputRecord.get(INT_FIELD));
+      thriftRecord.setLongField((long) inputRecord.get(LONG_FIELD));
+
+      Map<String, Object> nestedStructValues = (Map<String, Object>) inputRecord.get(NESTED_STRUCT_FIELD);
+      thriftRecord.setNestedStructField(createNestedType(
+          (String) nestedStructValues.get(NESTED_STRING_FIELD),
+          (int) nestedStructValues.get(NESTED_INT_FIELD))
+      );
+
+      thriftRecord.setSimpleListField((List<String>) inputRecord.get(SIMPLE_LIST));
+
+      List<NestedType> nestedTypeList = new ArrayList<>();
+      for (Map element : (List<Map>) inputRecord.get(COMPLEX_LIST)) {
+        nestedTypeList.add(createNestedType((String) element.get(NESTED_STRING_FIELD),
+            (Integer) element.get(NESTED_INT_FIELD)));
+      }
+
+      thriftRecord.setComplexListField(nestedTypeList);
+      thriftRecord.setBooleanField(Boolean.valueOf((String) inputRecord.get(BOOL_FIELD)));
+      thriftRecord.setDoubleField((Double) inputRecord.get(DOUBLE_FIELD));
+      thriftRecord.setStringField((String) inputRecord.get(STRING_FIELD));
+      thriftRecord.setEnumField(TestEnum.valueOf((String) inputRecord.get(ENUM_FIELD)));
+      thriftRecord.setSimpleMapField((Map<String, Integer>) inputRecord.get(SIMPLE_MAP));
+
+      Map<String, NestedType> complexMap = new HashMap<>();
+      for (Map.Entry<String, Map<String, Object>> entry :
+          ((Map<String, Map<String, Object>>) inputRecord.get(COMPLEX_MAP)).entrySet()) {
+        complexMap.put(entry.getKey(), createNestedType(
+            (String) entry.getValue().get(NESTED_STRING_FIELD),
+            (int) entry.getValue().get(NESTED_INT_FIELD)));
+      }
+      thriftRecord.setComplexMapField(complexMap);
+      thriftRecords.add(thriftRecord);
+    }
+
+    BufferedOutputStream bufferedOut = new BufferedOutputStream(new FileOutputStream(_tempFile));
+    TBinaryProtocol binaryOut = new TBinaryProtocol(new TIOStreamTransport(bufferedOut));
+    for (ComplexTypes record : thriftRecords) {
+      try {
+        record.write(binaryOut);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+    bufferedOut.close();
+  }
+
+  private Map<String, Object> createRecord1() {
+    Map<String, Object> record = new HashMap<>();
+    record.put(STRING_FIELD, "hello");
+    record.put(INT_FIELD, 10);
+    record.put(LONG_FIELD, 1000L);
+    record.put(DOUBLE_FIELD, 1.0);
+    record.put(BOOL_FIELD, "false");
+    record.put(ENUM_FIELD, TestEnum.DELTA.toString());
+    record.put(NESTED_STRUCT_FIELD, createNestedMap(NESTED_STRING_FIELD, "ice cream", NESTED_INT_FIELD, 5));
+    record.put(SIMPLE_LIST, Arrays.asList("aaa", "bbb", "ccc"));
+    record.put(COMPLEX_LIST,
+        Arrays.asList(
+            createNestedMap(NESTED_STRING_FIELD, "hows", NESTED_INT_FIELD, 10),
+            createNestedMap(NESTED_STRING_FIELD, "it", NESTED_INT_FIELD, 20),
+            createNestedMap(NESTED_STRING_FIELD, "going", NESTED_INT_FIELD, 30)
+        )
+    );
+    record.put(SIMPLE_MAP, createNestedMap("Tuesday", 3, "Wednesday", 4));
+    record.put(
+        COMPLEX_MAP,
+        createNestedMap(
+            "fruit1", createNestedMap(NESTED_STRING_FIELD, "apple", NESTED_INT_FIELD, 1),
+            "fruit2", createNestedMap(NESTED_STRING_FIELD, "orange", NESTED_INT_FIELD, 2)
+        )
+    );
+    return record;
+  }
+
+  private Map<String, Object> createRecord2() {
+    Map<String, Object> record = new HashMap<>();
+    record.put(STRING_FIELD, "world");
+    record.put(INT_FIELD, 20);
+    record.put(LONG_FIELD, 2000L);
+    record.put(DOUBLE_FIELD, 2.0);
+    record.put(BOOL_FIELD, "false");
+    record.put(ENUM_FIELD, TestEnum.GAMMA.toString());
+    record.put(NESTED_STRUCT_FIELD, createNestedMap(NESTED_STRING_FIELD, "ice cream", NESTED_INT_FIELD, 5));
+    record.put(SIMPLE_LIST, Arrays.asList("aaa", "bbb", "ccc"));
+    record.put(COMPLEX_LIST,
+        Arrays.asList(
+            createNestedMap(NESTED_STRING_FIELD, "hows", NESTED_INT_FIELD, 10),
+            createNestedMap(NESTED_STRING_FIELD, "it", NESTED_INT_FIELD, 20),
+            createNestedMap(NESTED_STRING_FIELD, "going", NESTED_INT_FIELD, 30)
+        )
+    );
+    record.put(SIMPLE_MAP, createNestedMap("Tuesday", 3, "Wednesday", 4));
+    record.put(
+        COMPLEX_MAP,
+        createNestedMap(
+            "fruit1", createNestedMap(NESTED_STRING_FIELD, "apple", NESTED_INT_FIELD, 1),
+            "fruit2", createNestedMap(NESTED_STRING_FIELD, "orange", NESTED_INT_FIELD, 2)
+        )
+    );
+    return record;
+  }
+
+  private Map<String, Object> createNestedMap(String key1, Object value1, String key2, Object value2) {
+    Map<String, Object> nestedMap = new HashMap<>(2);
+    nestedMap.put(key1, value1);
+    nestedMap.put(key2, value2);
+    return nestedMap;
+  }
+
+  private NestedType createNestedType(String stringField, int intField) {
+    NestedType nestedRecord = new NestedType();
+    nestedRecord.setNestedStringField(stringField);
+    nestedRecord.setNestedIntField(intField);
+    return nestedRecord;
+  }
+
+  @Override
+  protected boolean testExtractAll() {

Review comment:
       if this is true for all extractor tests now, can we remove this method and just default to testing extractAll?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r497970567



##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -72,7 +73,7 @@
   private int _nextRowId;
 
   @Override
-  public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)

Review comment:
       I had the same initial thought and asked Neha the same thing.
   
   Its because ORC's columnar format doesn't quite fit the `RecordExtractor` interface. The method `GenericRow extract(T from, GenericRow to)` expects one record/row to be extracted but the ORC record reader is unique in how it reads rows in batches. In addition, ColumnVectors have an optimization in the case of repeating values in which the first row in the row batch contains the repeating value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r502576251



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Base abstract class for extracting and converting the fields of various data formats into supported Pinot data types.
+ *
+ * @param <T> the format of the input record
+ */
+public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
+
+  /**
+   * Converts the field value to either a single value (string, number, byte[]), multi value (Object[]) or a Map.
+   * Returns {@code null} if the value is an empty array/collection/map.
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it in the transformation layer.
+   */
+  @Nullable
+  public Object convert(Object value) {
+    Object convertedValue;
+    if (isInstanceOfMultiValue(value)) {
+      convertedValue = convertMultiValue(value);
+    } else if (isInstanceOfMap(value)) {
+      convertedValue = convertMap(value);
+    } else if (isInstanceOfRecord(value)) {
+      convertedValue = convertRecord(value);
+    } else {
+      convertedValue = convertSingleValue(value);
+    }
+    return convertedValue;
+  }
+
+  /**
+   * Returns whether the object is an instance of the data format's base type. Override this method if the extractor
+   * can handle the conversion of nested record types.
+   */
+  protected boolean isInstanceOfRecord(Object value) {
+    return false;
+  }
+
+  /**
+   * Returns whether the object is of a multi-value type. Override this method if the data format represents
+   * multi-value objects differently.
+   */
+  protected boolean isInstanceOfMultiValue(Object value) {
+    return value instanceof Collection;
+  }
+
+  /**
+   * Returns whether the object is of a map type. Override this method if the data format represents map objects
+   * differently.
+   */
+  protected boolean isInstanceOfMap(Object value) {
+    return value instanceof Map;
+  }
+
+  /**
+   * Handles the conversion of every field of the object for the particular data format. Override this method if the
+   * extractor can convert nested record types.
+   */
+  @Nullable
+  protected Object convertRecord(Object value) {
+    throw new UnsupportedOperationException("Extractor cannot convert record type structures for this data format.");
+  }
+
+  /**
+   * Handles the conversion of each element of a multi-value object. Returns {@code null} if the field value is
+   * {@code null}.
+   *
+   * This implementation converts the Collection to an Object array. Override this method if the data format
+   * requires a different conversion for its multi-value objects.
+   */
+  @Nullable
+  protected Object convertMultiValue(Object value) {
+    Collection collection = (Collection) value;
+    if (collection.isEmpty()) {
+      return null;
+    }
+
+    int numValues = collection.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object element : collection) {
+      Object convertedValue = null;
+      if (element != null) {
+        convertedValue = convert(element);
+      }
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
+    }
+  }
+
+  /**
+   * Handles the conversion of every value of the map. Note that map keys will be handled as a single-value type.
+   * Returns {@code null} if the field value is {@code null}. This should be overridden if the data format requires
+   * a different conversion for map values.
+   */
+  @Nullable
+  protected Object convertMap(Object value) {
+    Map map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      Object convertedValue = null;
+      if (key != null) {
+        convertedValue = convert(map.get(key));
+      }
+      convertedMap.put(convertSingleValue(key), convertedValue);
+    }
+    return convertedMap;
+  }
+
+  /**
+   * Converts single value types. This should be overridden if the data format requires
+   * a different conversion for its single values. Returns {@code null} for {@code null} input values.
+   */
+  @Nullable
+  protected Object convertSingleValue(@Nullable Object value) {

Review comment:
       Looks like you have put `if (value != null)` almost everywhere before calling convert, so this indeed won't be null.
   However, I prefer the way it used to be - no null checks in every single RecordExtractor, and instead handling null here, so we have lesser of the `if not null` lines.
   But I'll leave it up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] timsants commented on a change in pull request #6046: Deep Extraction Support for ORC, Thrift, and ProtoBuf Records

Posted by GitBox <gi...@apache.org>.
timsants commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r504421904



##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {

Review comment:
       Resolving since ORC extraction is different from the other extractors.

##########
File path: pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) _rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, _nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category (STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {
+      TypeDescription childType = fieldType.getChildren().get(0);
+      ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+      if (columnVector.isRepeating) {
+        rowId = 0;
+      }
+      if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
+        int offset = (int) listColumnVector.offsets[rowId];
+        int length = (int) listColumnVector.lengths[rowId];
+        List<Object> values = new ArrayList<>(length);
+        for (int j = 0; j < length; j++) {
+          Object value = extractValue(field, listColumnVector.child, childType,offset + j);
+          // NOTE: Only keep non-null values
+          if (value != null) {
+            values.add(value);
           }
-          reuse.putValue(field, map);
+        }
+        if (!values.isEmpty()) {
+          return values.toArray();
         } else {
-          reuse.putValue(field, null);
+          // NOTE: Treat empty list as null
+          return null;
         }
       } else {
-        // Single-value field
-        reuse.putValue(field, extractSingleValue(field, _rowBatch.cols[i], _nextRowId, category));
+        return null;
       }
-    }
+    } else if (category == TypeDescription.Category.MAP) {

Review comment:
       Resolving since ORC extraction is different from the other extractors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org