You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/02/02 20:13:18 UTC

[GitHub] [incubator-pinot] fx19880617 opened a new pull request #6525: Adding native parquet record reader support

fx19880617 opened a new pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525


   ## Description
   Adding native parquet record reader support.
   It reads:
   - Parquet INT96 (nanoseconds) to LONG (milliseconds)
   - Decimal value to double 
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569222591



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetNativeRecordReader implements RecordReader {
+  private Path _dataFilePath;
+  private ParquetNativeRecordExtractor _recordExtractor;
+  private MessageType _schema;
+  private ParquetMetadata _parquetMetadata;
+  private ParquetFileReader _parquetFileReader;
+  private Group _nextRecord;
+  private PageReadStore _pageReadStore;
+  private MessageColumnIO _columnIO;
+  private org.apache.parquet.io.RecordReader _parquetRecordReader;
+  private int _currentPageIdx;
+
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER);
+    _recordExtractor = new ParquetNativeRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _schema = _parquetMetadata.getFileMetaData().getSchema();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _columnIO = new ColumnIOFactory().getColumnIO(_schema);
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (_pageReadStore == null) {
+      return false;
+    }
+    if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) {
+      // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx);
+      return true;
+    }
+    try {
+      _pageReadStore = _parquetFileReader.readNextRowGroup();
+      _currentPageIdx = 0;
+      if (_pageReadStore == null) {
+        return false;
+      }
+      _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return hasNext();
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _nextRecord = (Group) _parquetRecordReader.read();
+    _recordExtractor.extract(_nextRecord, reuse);

Review comment:
       It's not ;) I think you may refer to the page.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-771975222


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=h1) Report
   > Merging [#6525](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=desc) (5a7f950) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.67%`.
   > The diff coverage is `60.09%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6525/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6525      +/-   ##
   ==========================================
   - Coverage   66.44%   64.77%   -1.68%     
   ==========================================
     Files        1075     1338     +263     
     Lines       54773    66023   +11250     
     Branches     8168     9651    +1483     
   ==========================================
   + Hits        36396    42768    +6372     
   - Misses      15700    20184    +4484     
   - Partials     2677     3071     +394     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `64.77% <60.09%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <ø> (+9.52%)` | :arrow_up: |
   | [...n/java/org/apache/pinot/client/BrokerResponse.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Jyb2tlclJlc3BvbnNlLmphdmE=) | `100.00% <ø> (ø)` | |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <ø> (-13.29%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/client/ExecutionStats.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0V4ZWN1dGlvblN0YXRzLmphdmE=) | `15.55% <ø> (ø)` | |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <ø> (-51.10%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/client/ResultSetGroup.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFNldEdyb3VwLmphdmE=) | `65.38% <ø> (+0.16%)` | :arrow_up: |
   | ... and [1194 more](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=footer). Last update [0f398a7...5a7f950](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 merged pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 merged pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569203392



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);

Review comment:
       this code is same as for OPTIONAL ? 

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {

Review comment:
       If the field has repetition, shouldn't we use extractList() ?

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {

Review comment:
       (nit) consider renaming field to fieldIndex or columnIndex and index to repetitionValueIndex or something similar.
   
   index doesn't represent rowIndex right? I believe it comes from fieldRepetitionCount?

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {

Review comment:
       (nit) consider renaming field to fieldIndex or columnIndex

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {

Review comment:
       Although this is not on performance critical path, I think parquet does provide efficient columnar readers. We should consider.
   
   Also, this interface is not clear. The extraction is happening one Parquet record at a time but the method takes a RowGroup?

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetAvroRecordReader implements RecordReader {

Review comment:
       If the underlying file is Parquet, why are we using AvroRecordExtractor in this reader? 
   Also, the class name is confusing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577768640



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int fieldIndex) {
+    int valueCount = from.getFieldRepetitionCount(fieldIndex);
+    Type fieldType = from.getType().getType(fieldIndex);
+    if (valueCount == 0) {
+      return null;

Review comment:
       In think in Parquet, definition levels are used to determine nullability. So if we are going to store null in the Pinot GenericRow, shouldn't we rely on the definition level?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569209958



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       I think an example of how a nested Parquet record is extracted into pinot GenericRow will help a lot




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577767099



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));

Review comment:
       (nit) you can stash away from.getType() once before the loop

##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int fieldIndex) {
+    int valueCount = from.getFieldRepetitionCount(fieldIndex);
+    Type fieldType = from.getType().getType(fieldIndex);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, fieldIndex, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {

Review comment:
       Please consider adding a comment that this loop is for multi-value (repeated field)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577769463



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       Understood. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r578135344



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {

Review comment:
       I think it's still parsed as Object[0] instead of Object, so the `.isArray()` check will be true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569213492



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {

Review comment:
       You can treat this Group as one JsonObject.
   It's just one-row extraction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569217718



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY

Review comment:
       cause in Parquet, INT96 timestamp is used for nanoseconds. FYI: https://stackoverflow.com/questions/53103762/cast-int96-timestamp-from-parquet-to-golang/53104516#53104516




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569210587



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetAvroRecordReader implements RecordReader {

Review comment:
       This is current ParquetRecordReader, it's underly using the `parquet-avro`(https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html) library to handle the read/wrie logics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-772392418


   Adding many test parquet files.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569220841



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {

Review comment:
       cause the result type is `Object[]` not `List`.
   `Object[]` is not an instance of Collection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569214865



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {

Review comment:
       yes, index comes from the fieldRepetitionCount




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569221700



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       This extracting logic is not going to flatten it, I tried to keep the logic the same as the existing implementation. 
   When Pinot indexes this row, it should handle how to treat this Map or List.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569208452



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       Is this needed to compare at every level of hierarchy?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577765215



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY

Review comment:
       Got it. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569220319



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {

Review comment:
       You are right, merging these logics together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569221700



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       we are not flattening it, I tried to keep the logic the same as the existing implementation. 
   When Pinot indexes this row, it will handle how to treat this Map or List.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569288948



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       Below is one example of a large nested record in the `githubActivities.snappy.parquet` file:
   ```
   {
   	"actor": {
   		"avatar_url": "https://avatars.githubusercontent.com/u/5439615?",
   		"display_login": "johndmulhausen",
   		"gravatar_id": "",
   		"id": 5439615,
   		"login": "johndmulhausen",
   		"url": "https://api.github.com/users/johndmulhausen"
   	},
   	"created_at": "2016-07-07T00:00:34Z",
   	"id": "4243624603",
   	"org": {
   		"avatar_url": "https://avatars.githubusercontent.com/u/13629408?",
   		"gravatar_id": "",
   		"id": 13629408,
   		"login": "kubernetes",
   		"url": "https://api.github.com/orgs/kubernetes"
   	},
   	"payload": {
   		"action": "closed",
   		"number": 507,
   		"pull_request": {
   			"_links": {
   				"comments": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507/comments"
   				},
   				"commits": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/commits"
   				},
   				"html": {
   					"href": "https://github.com/kubernetes/kubernetes.github.io/pull/507"
   				},
   				"issue": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507"
   				},
   				"review_comment": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/comments{/number}"
   				},
   				"review_comments": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/comments"
   				},
   				"self": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507"
   				},
   				"statuses": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/0700c5ca798fce6d1f323ca70baa5ef45e82e491"
   				}
   			},
   			"additions": 38,
   			"assignees": {},
   			"base": {
   				"label": "kubernetes:master",
   				"ref": "master",
   				"repo": {
   					"archive_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/{archive_format}{/ref}",
   					"assignees_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/assignees{/user}",
   					"blobs_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/blobs{/sha}",
   					"branches_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/branches{/branch}",
   					"clone_url": "https://github.com/kubernetes/kubernetes.github.io.git",
   					"collaborators_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/collaborators{/collaborator}",
   					"comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/comments{/number}",
   					"commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/commits{/sha}",
   					"compare_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/compare/{base}...{head}",
   					"contents_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/contents/{+path}",
   					"contributors_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/contributors",
   					"created_at": "2016-02-10T22:46:48Z",
   					"default_branch": "master",
   					"deployments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/deployments",
   					"description": "Website/documentation repo",
   					"downloads_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/downloads",
   					"events_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/events",
   					"fork": false,
   					"forks": 329,
   					"forks_count": 329,
   					"forks_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/forks",
   					"full_name": "kubernetes/kubernetes.github.io",
   					"git_commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/commits{/sha}",
   					"git_refs_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/refs{/sha}",
   					"git_tags_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/tags{/sha}",
   					"git_url": "git://github.com/kubernetes/kubernetes.github.io.git",
   					"has_downloads": true,
   					"has_issues": true,
   					"has_pages": true,
   					"has_wiki": true,
   					"hooks_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/hooks",
   					"html_url": "https://github.com/kubernetes/kubernetes.github.io",
   					"id": 51478266,
   					"issue_comment_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/comments{/number}",
   					"issue_events_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/events{/number}",
   					"issues_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues{/number}",
   					"keys_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/keys{/key_id}",
   					"labels_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/labels{/name}",
   					"language": "HTML",
   					"languages_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/languages",
   					"merges_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/merges",
   					"milestones_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/milestones{/number}",
   					"name": "kubernetes.github.io",
   					"notifications_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/notifications{?since,all,participating}",
   					"open_issues": 289,
   					"open_issues_count": 289,
   					"owner": {
   						"avatar_url": "https://avatars.githubusercontent.com/u/13629408?v=3",
   						"events_url": "https://api.github.com/users/kubernetes/events{/privacy}",
   						"followers_url": "https://api.github.com/users/kubernetes/followers",
   						"following_url": "https://api.github.com/users/kubernetes/following{/other_user}",
   						"gists_url": "https://api.github.com/users/kubernetes/gists{/gist_id}",
   						"gravatar_id": "",
   						"html_url": "https://github.com/kubernetes",
   						"id": 13629408,
   						"login": "kubernetes",
   						"organizations_url": "https://api.github.com/users/kubernetes/orgs",
   						"received_events_url": "https://api.github.com/users/kubernetes/received_events",
   						"repos_url": "https://api.github.com/users/kubernetes/repos",
   						"site_admin": false,
   						"starred_url": "https://api.github.com/users/kubernetes/starred{/owner}{/repo}",
   						"subscriptions_url": "https://api.github.com/users/kubernetes/subscriptions",
   						"type": "Organization",
   						"url": "https://api.github.com/users/kubernetes"
   					},
   					"private": false,
   					"pulls_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls{/number}",
   					"pushed_at": "2016-07-07T00:00:33Z",
   					"releases_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/releases{/id}",
   					"size": 39878,
   					"ssh_url": "git@github.com:kubernetes/kubernetes.github.io.git",
   					"stargazers_count": 55,
   					"stargazers_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/stargazers",
   					"statuses_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/{sha}",
   					"subscribers_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/subscribers",
   					"subscription_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/subscription",
   					"svn_url": "https://github.com/kubernetes/kubernetes.github.io",
   					"tags_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/tags",
   					"teams_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/teams",
   					"trees_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/trees{/sha}",
   					"updated_at": "2016-07-06T22:19:07Z",
   					"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io",
   					"watchers": 55,
   					"watchers_count": 55
   				},
   				"sha": "85eba1f0b3100abd75cb5c0355ff6a474f977a07",
   				"user": {
   					"avatar_url": "https://avatars.githubusercontent.com/u/13629408?v=3",
   					"events_url": "https://api.github.com/users/kubernetes/events{/privacy}",
   					"followers_url": "https://api.github.com/users/kubernetes/followers",
   					"following_url": "https://api.github.com/users/kubernetes/following{/other_user}",
   					"gists_url": "https://api.github.com/users/kubernetes/gists{/gist_id}",
   					"gravatar_id": "",
   					"html_url": "https://github.com/kubernetes",
   					"id": 13629408,
   					"login": "kubernetes",
   					"organizations_url": "https://api.github.com/users/kubernetes/orgs",
   					"received_events_url": "https://api.github.com/users/kubernetes/received_events",
   					"repos_url": "https://api.github.com/users/kubernetes/repos",
   					"site_admin": false,
   					"starred_url": "https://api.github.com/users/kubernetes/starred{/owner}{/repo}",
   					"subscriptions_url": "https://api.github.com/users/kubernetes/subscriptions",
   					"type": "Organization",
   					"url": "https://api.github.com/users/kubernetes"
   				}
   			},
   			"body": "Mostly the same as https://github.com/kubernetes/kubernetes/pull/25574",
   			"changed_files": 1,
   			"closed_at": "2016-07-07T00:00:33Z",
   			"comments": 2,
   			"comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507/comments",
   			"commits": 1,
   			"commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/commits",
   			"created_at": "2016-05-13T16:01:23Z",
   			"deletions": 0,
   			"diff_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507.diff",
   			"head": {
   				"label": "therc:patch-2",
   				"ref": "patch-2",
   				"repo": {
   					"archive_url": "https://api.github.com/repos/therc/kubernetes.github.io/{archive_format}{/ref}",
   					"assignees_url": "https://api.github.com/repos/therc/kubernetes.github.io/assignees{/user}",
   					"blobs_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/blobs{/sha}",
   					"branches_url": "https://api.github.com/repos/therc/kubernetes.github.io/branches{/branch}",
   					"clone_url": "https://github.com/therc/kubernetes.github.io.git",
   					"collaborators_url": "https://api.github.com/repos/therc/kubernetes.github.io/collaborators{/collaborator}",
   					"comments_url": "https://api.github.com/repos/therc/kubernetes.github.io/comments{/number}",
   					"commits_url": "https://api.github.com/repos/therc/kubernetes.github.io/commits{/sha}",
   					"compare_url": "https://api.github.com/repos/therc/kubernetes.github.io/compare/{base}...{head}",
   					"contents_url": "https://api.github.com/repos/therc/kubernetes.github.io/contents/{+path}",
   					"contributors_url": "https://api.github.com/repos/therc/kubernetes.github.io/contributors",
   					"created_at": "2016-03-14T21:11:34Z",
   					"default_branch": "master",
   					"deployments_url": "https://api.github.com/repos/therc/kubernetes.github.io/deployments",
   					"description": "Website/documentation repo",
   					"downloads_url": "https://api.github.com/repos/therc/kubernetes.github.io/downloads",
   					"events_url": "https://api.github.com/repos/therc/kubernetes.github.io/events",
   					"fork": true,
   					"forks": 0,
   					"forks_count": 0,
   					"forks_url": "https://api.github.com/repos/therc/kubernetes.github.io/forks",
   					"full_name": "therc/kubernetes.github.io",
   					"git_commits_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/commits{/sha}",
   					"git_refs_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/refs{/sha}",
   					"git_tags_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/tags{/sha}",
   					"git_url": "git://github.com/therc/kubernetes.github.io.git",
   					"has_downloads": true,
   					"has_issues": false,
   					"has_pages": false,
   					"has_wiki": true,
   					"hooks_url": "https://api.github.com/repos/therc/kubernetes.github.io/hooks",
   					"html_url": "https://github.com/therc/kubernetes.github.io",
   					"id": 53892222,
   					"issue_comment_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues/comments{/number}",
   					"issue_events_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues/events{/number}",
   					"issues_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues{/number}",
   					"keys_url": "https://api.github.com/repos/therc/kubernetes.github.io/keys{/key_id}",
   					"labels_url": "https://api.github.com/repos/therc/kubernetes.github.io/labels{/name}",
   					"language": "HTML",
   					"languages_url": "https://api.github.com/repos/therc/kubernetes.github.io/languages",
   					"merges_url": "https://api.github.com/repos/therc/kubernetes.github.io/merges",
   					"milestones_url": "https://api.github.com/repos/therc/kubernetes.github.io/milestones{/number}",
   					"name": "kubernetes.github.io",
   					"notifications_url": "https://api.github.com/repos/therc/kubernetes.github.io/notifications{?since,all,participating}",
   					"open_issues": 0,
   					"open_issues_count": 0,
   					"owner": {
   						"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   						"events_url": "https://api.github.com/users/therc/events{/privacy}",
   						"followers_url": "https://api.github.com/users/therc/followers",
   						"following_url": "https://api.github.com/users/therc/following{/other_user}",
   						"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   						"gravatar_id": "",
   						"html_url": "https://github.com/therc",
   						"id": 13481082,
   						"login": "therc",
   						"organizations_url": "https://api.github.com/users/therc/orgs",
   						"received_events_url": "https://api.github.com/users/therc/received_events",
   						"repos_url": "https://api.github.com/users/therc/repos",
   						"site_admin": false,
   						"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   						"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   						"type": "User",
   						"url": "https://api.github.com/users/therc"
   					},
   					"private": false,
   					"pulls_url": "https://api.github.com/repos/therc/kubernetes.github.io/pulls{/number}",
   					"pushed_at": "2016-05-13T16:01:15Z",
   					"releases_url": "https://api.github.com/repos/therc/kubernetes.github.io/releases{/id}",
   					"size": 33202,
   					"ssh_url": "git@github.com:therc/kubernetes.github.io.git",
   					"stargazers_count": 0,
   					"stargazers_url": "https://api.github.com/repos/therc/kubernetes.github.io/stargazers",
   					"statuses_url": "https://api.github.com/repos/therc/kubernetes.github.io/statuses/{sha}",
   					"subscribers_url": "https://api.github.com/repos/therc/kubernetes.github.io/subscribers",
   					"subscription_url": "https://api.github.com/repos/therc/kubernetes.github.io/subscription",
   					"svn_url": "https://github.com/therc/kubernetes.github.io",
   					"tags_url": "https://api.github.com/repos/therc/kubernetes.github.io/tags",
   					"teams_url": "https://api.github.com/repos/therc/kubernetes.github.io/teams",
   					"trees_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/trees{/sha}",
   					"updated_at": "2016-03-14T21:11:36Z",
   					"url": "https://api.github.com/repos/therc/kubernetes.github.io",
   					"watchers": 0,
   					"watchers_count": 0
   				},
   				"sha": "0700c5ca798fce6d1f323ca70baa5ef45e82e491",
   				"user": {
   					"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   					"events_url": "https://api.github.com/users/therc/events{/privacy}",
   					"followers_url": "https://api.github.com/users/therc/followers",
   					"following_url": "https://api.github.com/users/therc/following{/other_user}",
   					"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   					"gravatar_id": "",
   					"html_url": "https://github.com/therc",
   					"id": 13481082,
   					"login": "therc",
   					"organizations_url": "https://api.github.com/users/therc/orgs",
   					"received_events_url": "https://api.github.com/users/therc/received_events",
   					"repos_url": "https://api.github.com/users/therc/repos",
   					"site_admin": false,
   					"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   					"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   					"type": "User",
   					"url": "https://api.github.com/users/therc"
   				}
   			},
   			"html_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507",
   			"id": 70019391,
   			"issue_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507",
   			"locked": false,
   			"merge_commit_sha": "a135f0a717803b35ec80563768d5abb4b45ac4d1",
   			"mergeable_state": "unknown",
   			"merged": true,
   			"merged_at": "2016-07-07T00:00:33Z",
   			"merged_by": {
   				"avatar_url": "https://avatars.githubusercontent.com/u/5439615?v=3",
   				"events_url": "https://api.github.com/users/johndmulhausen/events{/privacy}",
   				"followers_url": "https://api.github.com/users/johndmulhausen/followers",
   				"following_url": "https://api.github.com/users/johndmulhausen/following{/other_user}",
   				"gists_url": "https://api.github.com/users/johndmulhausen/gists{/gist_id}",
   				"gravatar_id": "",
   				"html_url": "https://github.com/johndmulhausen",
   				"id": 5439615,
   				"login": "johndmulhausen",
   				"organizations_url": "https://api.github.com/users/johndmulhausen/orgs",
   				"received_events_url": "https://api.github.com/users/johndmulhausen/received_events",
   				"repos_url": "https://api.github.com/users/johndmulhausen/repos",
   				"site_admin": false,
   				"starred_url": "https://api.github.com/users/johndmulhausen/starred{/owner}{/repo}",
   				"subscriptions_url": "https://api.github.com/users/johndmulhausen/subscriptions",
   				"type": "User",
   				"url": "https://api.github.com/users/johndmulhausen"
   			},
   			"number": 507,
   			"patch_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507.patch",
   			"review_comment_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/comments{/number}",
   			"review_comments": 4,
   			"review_comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/comments",
   			"state": "closed",
   			"statuses_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/0700c5ca798fce6d1f323ca70baa5ef45e82e491",
   			"title": "Update service doc with AWS ELB SSL annotations",
   			"updated_at": "2016-07-07T00:00:33Z",
   			"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507",
   			"user": {
   				"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   				"events_url": "https://api.github.com/users/therc/events{/privacy}",
   				"followers_url": "https://api.github.com/users/therc/followers",
   				"following_url": "https://api.github.com/users/therc/following{/other_user}",
   				"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   				"gravatar_id": "",
   				"html_url": "https://github.com/therc",
   				"id": 13481082,
   				"login": "therc",
   				"organizations_url": "https://api.github.com/users/therc/orgs",
   				"received_events_url": "https://api.github.com/users/therc/received_events",
   				"repos_url": "https://api.github.com/users/therc/repos",
   				"site_admin": false,
   				"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   				"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   				"type": "User",
   				"url": "https://api.github.com/users/therc"
   			}
   		}
   	},
   	"public": true,
   	"repo": {
   		"id": 51478266,
   		"name": "kubernetes/kubernetes.github.io",
   		"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io"
   	},
   	"type": "PullRequestEvent"
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577771627



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {

Review comment:
       What if the repetitionCount is 1 and the only element in the list (list[0) is primitive ? In that case also I think we should return (Object[]) list[0] but looks like we won't since isArray() won't evaluate to true.
   IIUC, isArray will evaluate to true if the element inside the list is also a list ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569208582



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetNativeRecordReader implements RecordReader {
+  private Path _dataFilePath;
+  private ParquetNativeRecordExtractor _recordExtractor;
+  private MessageType _schema;
+  private ParquetMetadata _parquetMetadata;
+  private ParquetFileReader _parquetFileReader;
+  private Group _nextRecord;
+  private PageReadStore _pageReadStore;
+  private MessageColumnIO _columnIO;
+  private org.apache.parquet.io.RecordReader _parquetRecordReader;
+  private int _currentPageIdx;
+
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER);
+    _recordExtractor = new ParquetNativeRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _schema = _parquetMetadata.getFileMetaData().getSchema();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _columnIO = new ColumnIOFactory().getColumnIO(_schema);
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (_pageReadStore == null) {
+      return false;
+    }
+    if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) {
+      // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx);
+      return true;
+    }
+    try {
+      _pageReadStore = _parquetFileReader.readNextRowGroup();
+      _currentPageIdx = 0;
+      if (_pageReadStore == null) {
+        return false;
+      }
+      _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return hasNext();
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _nextRecord = (Group) _parquetRecordReader.read();
+    _recordExtractor.extract(_nextRecord, reuse);

Review comment:
       So given a Parquet RowGroup, we are extracting values from it in a single Pinot GenericRow ? AFAIK, parquet RowGroup contains a chunk of rows for each column. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-781126845


   > Suggest reducing the size of the test resource files. We might not want over 30MB resource files just for testing the basic functionalities
   
   make sense, reduced the data size.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-772669608


   @fx19880617 , I will do another pass later today. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569220408



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);

Review comment:
       yes, will merge this logic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-771975222


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=h1) Report
   > Merging [#6525](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=desc) (5a7f950) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.67%`.
   > The diff coverage is `60.09%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6525/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6525      +/-   ##
   ==========================================
   - Coverage   66.44%   64.77%   -1.68%     
   ==========================================
     Files        1075     1338     +263     
     Lines       54773    66023   +11250     
     Branches     8168     9651    +1483     
   ==========================================
   + Hits        36396    42768    +6372     
   - Misses      15700    20184    +4484     
   - Partials     2677     3071     +394     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `64.77% <60.09%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <ø> (+9.52%)` | :arrow_up: |
   | [...n/java/org/apache/pinot/client/BrokerResponse.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Jyb2tlclJlc3BvbnNlLmphdmE=) | `100.00% <ø> (ø)` | |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <ø> (-13.29%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/client/ExecutionStats.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0V4ZWN1dGlvblN0YXRzLmphdmE=) | `15.55% <ø> (ø)` | |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <ø> (-51.10%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/client/ResultSetGroup.java](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFNldEdyb3VwLmphdmE=) | `65.38% <ø> (+0.16%)` | :arrow_up: |
   | ... and [1194 more](https://codecov.io/gh/apache/incubator-pinot/pull/6525/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=footer). Last update [0f398a7...5a7f950](https://codecov.io/gh/apache/incubator-pinot/pull/6525?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r578135413



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int fieldIndex) {
+    int valueCount = from.getFieldRepetitionCount(fieldIndex);
+    Type fieldType = from.getType().getType(fieldIndex);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, fieldIndex, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#issuecomment-775047051


   > @fx19880617 , I will do another pass later today.
   
   Can you do another pass?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569224109



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       I tried to make the test automatically handles this logic.
   So the record reads from ParquetAvroRecordReader and ParquetNativeRecordReader are the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569203293



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {

Review comment:
       Why do we need to check for isArray() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569208765



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       Are we not flattening?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569290335



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       Still, the goal here is to on-par the generic rows parsed from existing ParquetRecordReader and new ParquetNativeRecordReader are the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r577762597



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetAvroRecordReader implements RecordReader {

Review comment:
       Understood now. This reader is for reading Avro data stored in Parquet files as per https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html
   Do you mind adding this to the javadoc?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569288948



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       Below is one example of a large nested record:
   ```
   {
   	"actor": {
   		"avatar_url": "https://avatars.githubusercontent.com/u/5439615?",
   		"display_login": "johndmulhausen",
   		"gravatar_id": "",
   		"id": 5439615,
   		"login": "johndmulhausen",
   		"url": "https://api.github.com/users/johndmulhausen"
   	},
   	"created_at": "2016-07-07T00:00:34Z",
   	"id": "4243624603",
   	"org": {
   		"avatar_url": "https://avatars.githubusercontent.com/u/13629408?",
   		"gravatar_id": "",
   		"id": 13629408,
   		"login": "kubernetes",
   		"url": "https://api.github.com/orgs/kubernetes"
   	},
   	"payload": {
   		"action": "closed",
   		"number": 507,
   		"pull_request": {
   			"_links": {
   				"comments": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507/comments"
   				},
   				"commits": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/commits"
   				},
   				"html": {
   					"href": "https://github.com/kubernetes/kubernetes.github.io/pull/507"
   				},
   				"issue": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507"
   				},
   				"review_comment": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/comments{/number}"
   				},
   				"review_comments": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/comments"
   				},
   				"self": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507"
   				},
   				"statuses": {
   					"href": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/0700c5ca798fce6d1f323ca70baa5ef45e82e491"
   				}
   			},
   			"additions": 38,
   			"assignees": {},
   			"base": {
   				"label": "kubernetes:master",
   				"ref": "master",
   				"repo": {
   					"archive_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/{archive_format}{/ref}",
   					"assignees_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/assignees{/user}",
   					"blobs_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/blobs{/sha}",
   					"branches_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/branches{/branch}",
   					"clone_url": "https://github.com/kubernetes/kubernetes.github.io.git",
   					"collaborators_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/collaborators{/collaborator}",
   					"comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/comments{/number}",
   					"commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/commits{/sha}",
   					"compare_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/compare/{base}...{head}",
   					"contents_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/contents/{+path}",
   					"contributors_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/contributors",
   					"created_at": "2016-02-10T22:46:48Z",
   					"default_branch": "master",
   					"deployments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/deployments",
   					"description": "Website/documentation repo",
   					"downloads_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/downloads",
   					"events_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/events",
   					"fork": false,
   					"forks": 329,
   					"forks_count": 329,
   					"forks_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/forks",
   					"full_name": "kubernetes/kubernetes.github.io",
   					"git_commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/commits{/sha}",
   					"git_refs_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/refs{/sha}",
   					"git_tags_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/tags{/sha}",
   					"git_url": "git://github.com/kubernetes/kubernetes.github.io.git",
   					"has_downloads": true,
   					"has_issues": true,
   					"has_pages": true,
   					"has_wiki": true,
   					"hooks_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/hooks",
   					"html_url": "https://github.com/kubernetes/kubernetes.github.io",
   					"id": 51478266,
   					"issue_comment_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/comments{/number}",
   					"issue_events_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/events{/number}",
   					"issues_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues{/number}",
   					"keys_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/keys{/key_id}",
   					"labels_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/labels{/name}",
   					"language": "HTML",
   					"languages_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/languages",
   					"merges_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/merges",
   					"milestones_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/milestones{/number}",
   					"name": "kubernetes.github.io",
   					"notifications_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/notifications{?since,all,participating}",
   					"open_issues": 289,
   					"open_issues_count": 289,
   					"owner": {
   						"avatar_url": "https://avatars.githubusercontent.com/u/13629408?v=3",
   						"events_url": "https://api.github.com/users/kubernetes/events{/privacy}",
   						"followers_url": "https://api.github.com/users/kubernetes/followers",
   						"following_url": "https://api.github.com/users/kubernetes/following{/other_user}",
   						"gists_url": "https://api.github.com/users/kubernetes/gists{/gist_id}",
   						"gravatar_id": "",
   						"html_url": "https://github.com/kubernetes",
   						"id": 13629408,
   						"login": "kubernetes",
   						"organizations_url": "https://api.github.com/users/kubernetes/orgs",
   						"received_events_url": "https://api.github.com/users/kubernetes/received_events",
   						"repos_url": "https://api.github.com/users/kubernetes/repos",
   						"site_admin": false,
   						"starred_url": "https://api.github.com/users/kubernetes/starred{/owner}{/repo}",
   						"subscriptions_url": "https://api.github.com/users/kubernetes/subscriptions",
   						"type": "Organization",
   						"url": "https://api.github.com/users/kubernetes"
   					},
   					"private": false,
   					"pulls_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls{/number}",
   					"pushed_at": "2016-07-07T00:00:33Z",
   					"releases_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/releases{/id}",
   					"size": 39878,
   					"ssh_url": "git@github.com:kubernetes/kubernetes.github.io.git",
   					"stargazers_count": 55,
   					"stargazers_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/stargazers",
   					"statuses_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/{sha}",
   					"subscribers_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/subscribers",
   					"subscription_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/subscription",
   					"svn_url": "https://github.com/kubernetes/kubernetes.github.io",
   					"tags_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/tags",
   					"teams_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/teams",
   					"trees_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/git/trees{/sha}",
   					"updated_at": "2016-07-06T22:19:07Z",
   					"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io",
   					"watchers": 55,
   					"watchers_count": 55
   				},
   				"sha": "85eba1f0b3100abd75cb5c0355ff6a474f977a07",
   				"user": {
   					"avatar_url": "https://avatars.githubusercontent.com/u/13629408?v=3",
   					"events_url": "https://api.github.com/users/kubernetes/events{/privacy}",
   					"followers_url": "https://api.github.com/users/kubernetes/followers",
   					"following_url": "https://api.github.com/users/kubernetes/following{/other_user}",
   					"gists_url": "https://api.github.com/users/kubernetes/gists{/gist_id}",
   					"gravatar_id": "",
   					"html_url": "https://github.com/kubernetes",
   					"id": 13629408,
   					"login": "kubernetes",
   					"organizations_url": "https://api.github.com/users/kubernetes/orgs",
   					"received_events_url": "https://api.github.com/users/kubernetes/received_events",
   					"repos_url": "https://api.github.com/users/kubernetes/repos",
   					"site_admin": false,
   					"starred_url": "https://api.github.com/users/kubernetes/starred{/owner}{/repo}",
   					"subscriptions_url": "https://api.github.com/users/kubernetes/subscriptions",
   					"type": "Organization",
   					"url": "https://api.github.com/users/kubernetes"
   				}
   			},
   			"body": "Mostly the same as https://github.com/kubernetes/kubernetes/pull/25574",
   			"changed_files": 1,
   			"closed_at": "2016-07-07T00:00:33Z",
   			"comments": 2,
   			"comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507/comments",
   			"commits": 1,
   			"commits_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/commits",
   			"created_at": "2016-05-13T16:01:23Z",
   			"deletions": 0,
   			"diff_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507.diff",
   			"head": {
   				"label": "therc:patch-2",
   				"ref": "patch-2",
   				"repo": {
   					"archive_url": "https://api.github.com/repos/therc/kubernetes.github.io/{archive_format}{/ref}",
   					"assignees_url": "https://api.github.com/repos/therc/kubernetes.github.io/assignees{/user}",
   					"blobs_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/blobs{/sha}",
   					"branches_url": "https://api.github.com/repos/therc/kubernetes.github.io/branches{/branch}",
   					"clone_url": "https://github.com/therc/kubernetes.github.io.git",
   					"collaborators_url": "https://api.github.com/repos/therc/kubernetes.github.io/collaborators{/collaborator}",
   					"comments_url": "https://api.github.com/repos/therc/kubernetes.github.io/comments{/number}",
   					"commits_url": "https://api.github.com/repos/therc/kubernetes.github.io/commits{/sha}",
   					"compare_url": "https://api.github.com/repos/therc/kubernetes.github.io/compare/{base}...{head}",
   					"contents_url": "https://api.github.com/repos/therc/kubernetes.github.io/contents/{+path}",
   					"contributors_url": "https://api.github.com/repos/therc/kubernetes.github.io/contributors",
   					"created_at": "2016-03-14T21:11:34Z",
   					"default_branch": "master",
   					"deployments_url": "https://api.github.com/repos/therc/kubernetes.github.io/deployments",
   					"description": "Website/documentation repo",
   					"downloads_url": "https://api.github.com/repos/therc/kubernetes.github.io/downloads",
   					"events_url": "https://api.github.com/repos/therc/kubernetes.github.io/events",
   					"fork": true,
   					"forks": 0,
   					"forks_count": 0,
   					"forks_url": "https://api.github.com/repos/therc/kubernetes.github.io/forks",
   					"full_name": "therc/kubernetes.github.io",
   					"git_commits_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/commits{/sha}",
   					"git_refs_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/refs{/sha}",
   					"git_tags_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/tags{/sha}",
   					"git_url": "git://github.com/therc/kubernetes.github.io.git",
   					"has_downloads": true,
   					"has_issues": false,
   					"has_pages": false,
   					"has_wiki": true,
   					"hooks_url": "https://api.github.com/repos/therc/kubernetes.github.io/hooks",
   					"html_url": "https://github.com/therc/kubernetes.github.io",
   					"id": 53892222,
   					"issue_comment_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues/comments{/number}",
   					"issue_events_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues/events{/number}",
   					"issues_url": "https://api.github.com/repos/therc/kubernetes.github.io/issues{/number}",
   					"keys_url": "https://api.github.com/repos/therc/kubernetes.github.io/keys{/key_id}",
   					"labels_url": "https://api.github.com/repos/therc/kubernetes.github.io/labels{/name}",
   					"language": "HTML",
   					"languages_url": "https://api.github.com/repos/therc/kubernetes.github.io/languages",
   					"merges_url": "https://api.github.com/repos/therc/kubernetes.github.io/merges",
   					"milestones_url": "https://api.github.com/repos/therc/kubernetes.github.io/milestones{/number}",
   					"name": "kubernetes.github.io",
   					"notifications_url": "https://api.github.com/repos/therc/kubernetes.github.io/notifications{?since,all,participating}",
   					"open_issues": 0,
   					"open_issues_count": 0,
   					"owner": {
   						"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   						"events_url": "https://api.github.com/users/therc/events{/privacy}",
   						"followers_url": "https://api.github.com/users/therc/followers",
   						"following_url": "https://api.github.com/users/therc/following{/other_user}",
   						"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   						"gravatar_id": "",
   						"html_url": "https://github.com/therc",
   						"id": 13481082,
   						"login": "therc",
   						"organizations_url": "https://api.github.com/users/therc/orgs",
   						"received_events_url": "https://api.github.com/users/therc/received_events",
   						"repos_url": "https://api.github.com/users/therc/repos",
   						"site_admin": false,
   						"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   						"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   						"type": "User",
   						"url": "https://api.github.com/users/therc"
   					},
   					"private": false,
   					"pulls_url": "https://api.github.com/repos/therc/kubernetes.github.io/pulls{/number}",
   					"pushed_at": "2016-05-13T16:01:15Z",
   					"releases_url": "https://api.github.com/repos/therc/kubernetes.github.io/releases{/id}",
   					"size": 33202,
   					"ssh_url": "git@github.com:therc/kubernetes.github.io.git",
   					"stargazers_count": 0,
   					"stargazers_url": "https://api.github.com/repos/therc/kubernetes.github.io/stargazers",
   					"statuses_url": "https://api.github.com/repos/therc/kubernetes.github.io/statuses/{sha}",
   					"subscribers_url": "https://api.github.com/repos/therc/kubernetes.github.io/subscribers",
   					"subscription_url": "https://api.github.com/repos/therc/kubernetes.github.io/subscription",
   					"svn_url": "https://github.com/therc/kubernetes.github.io",
   					"tags_url": "https://api.github.com/repos/therc/kubernetes.github.io/tags",
   					"teams_url": "https://api.github.com/repos/therc/kubernetes.github.io/teams",
   					"trees_url": "https://api.github.com/repos/therc/kubernetes.github.io/git/trees{/sha}",
   					"updated_at": "2016-03-14T21:11:36Z",
   					"url": "https://api.github.com/repos/therc/kubernetes.github.io",
   					"watchers": 0,
   					"watchers_count": 0
   				},
   				"sha": "0700c5ca798fce6d1f323ca70baa5ef45e82e491",
   				"user": {
   					"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   					"events_url": "https://api.github.com/users/therc/events{/privacy}",
   					"followers_url": "https://api.github.com/users/therc/followers",
   					"following_url": "https://api.github.com/users/therc/following{/other_user}",
   					"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   					"gravatar_id": "",
   					"html_url": "https://github.com/therc",
   					"id": 13481082,
   					"login": "therc",
   					"organizations_url": "https://api.github.com/users/therc/orgs",
   					"received_events_url": "https://api.github.com/users/therc/received_events",
   					"repos_url": "https://api.github.com/users/therc/repos",
   					"site_admin": false,
   					"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   					"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   					"type": "User",
   					"url": "https://api.github.com/users/therc"
   				}
   			},
   			"html_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507",
   			"id": 70019391,
   			"issue_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/issues/507",
   			"locked": false,
   			"merge_commit_sha": "a135f0a717803b35ec80563768d5abb4b45ac4d1",
   			"mergeable_state": "unknown",
   			"merged": true,
   			"merged_at": "2016-07-07T00:00:33Z",
   			"merged_by": {
   				"avatar_url": "https://avatars.githubusercontent.com/u/5439615?v=3",
   				"events_url": "https://api.github.com/users/johndmulhausen/events{/privacy}",
   				"followers_url": "https://api.github.com/users/johndmulhausen/followers",
   				"following_url": "https://api.github.com/users/johndmulhausen/following{/other_user}",
   				"gists_url": "https://api.github.com/users/johndmulhausen/gists{/gist_id}",
   				"gravatar_id": "",
   				"html_url": "https://github.com/johndmulhausen",
   				"id": 5439615,
   				"login": "johndmulhausen",
   				"organizations_url": "https://api.github.com/users/johndmulhausen/orgs",
   				"received_events_url": "https://api.github.com/users/johndmulhausen/received_events",
   				"repos_url": "https://api.github.com/users/johndmulhausen/repos",
   				"site_admin": false,
   				"starred_url": "https://api.github.com/users/johndmulhausen/starred{/owner}{/repo}",
   				"subscriptions_url": "https://api.github.com/users/johndmulhausen/subscriptions",
   				"type": "User",
   				"url": "https://api.github.com/users/johndmulhausen"
   			},
   			"number": 507,
   			"patch_url": "https://github.com/kubernetes/kubernetes.github.io/pull/507.patch",
   			"review_comment_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/comments{/number}",
   			"review_comments": 4,
   			"review_comments_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507/comments",
   			"state": "closed",
   			"statuses_url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/statuses/0700c5ca798fce6d1f323ca70baa5ef45e82e491",
   			"title": "Update service doc with AWS ELB SSL annotations",
   			"updated_at": "2016-07-07T00:00:33Z",
   			"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io/pulls/507",
   			"user": {
   				"avatar_url": "https://avatars.githubusercontent.com/u/13481082?v=3",
   				"events_url": "https://api.github.com/users/therc/events{/privacy}",
   				"followers_url": "https://api.github.com/users/therc/followers",
   				"following_url": "https://api.github.com/users/therc/following{/other_user}",
   				"gists_url": "https://api.github.com/users/therc/gists{/gist_id}",
   				"gravatar_id": "",
   				"html_url": "https://github.com/therc",
   				"id": 13481082,
   				"login": "therc",
   				"organizations_url": "https://api.github.com/users/therc/orgs",
   				"received_events_url": "https://api.github.com/users/therc/received_events",
   				"repos_url": "https://api.github.com/users/therc/repos",
   				"site_admin": false,
   				"starred_url": "https://api.github.com/users/therc/starred{/owner}{/repo}",
   				"subscriptions_url": "https://api.github.com/users/therc/subscriptions",
   				"type": "User",
   				"url": "https://api.github.com/users/therc"
   			}
   		}
   	},
   	"public": true,
   	"repo": {
   		"id": 51478266,
   		"name": "kubernetes/kubernetes.github.io",
   		"url": "https://api.github.com/repos/kubernetes/kubernetes.github.io"
   	},
   	"type": "PullRequestEvent"
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569206859



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY

Review comment:
       Why dateTime ? We can just treat INT96 as 8 byte long




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r578137100



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int fieldIndex) {
+    int valueCount = from.getFieldRepetitionCount(fieldIndex);
+    Type fieldType = from.getType().getType(fieldIndex);
+    if (valueCount == 0) {
+      return null;

Review comment:
       Here we just set all the values including null, into a generic row that needs to loop every field repetition, no direct access to definition level api.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569220408



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);

Review comment:
       yes, will merge thi logic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569202894



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       How are we using this this?. Are we extracting this as list of structs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569223477



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
##########
@@ -167,11 +167,62 @@ public boolean equals(Object obj) {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {

Review comment:
       yes, cause when I do a deep comparison with nested generic row, there will be a list of maps containing lists of maps ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569221700



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {

Review comment:
       we are not flattening it, I tried to keep the logic the same as the existing implementation. 
   When Pinot indexes this row, it should handle how to treat this Map or List.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6525: Adding native parquet record reader support

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6525:
URL: https://github.com/apache/incubator-pinot/pull/6525#discussion_r569210587



##########
File path: pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetAvroRecordReader implements RecordReader {

Review comment:
       This is current ParquetRecordReader implementation, it's underly using the `parquet-avro`(https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html) library to handle the read/wrie logics.
   
   I just renamed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org