You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by arina-ielchiieva <gi...@git.apache.org> on 2018/04/17 13:18:24 UTC

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

GitHub user arina-ielchiieva opened a pull request:

    https://github.com/apache/drill/pull/1214

    DRILL-6331: Revisit Hive Drill native parquet implementation to be ex…

    …posed to Drill optimizations (filter / limit push down, count to direct scan)
    
    1. Factored out common logic for Drill parquet reader and Hive Drill native parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, AbstractParquetScanBatchCreator.
    2. Rules that worked previously only with ParquetGroupScan, now can be applied for any class that extends AbstractParquetGroupScan: DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule.
    3. Hive populated partition values based on information returned from Hive metastore. Drill populates partition values based on path difference between selection root and actual file path.
       Before ColumnExplorer populated partition values based on Drill approach. Since now ColumnExplorer populates values for parquet files from Hive tables,
       `populateImplicitColumns` method logic was changed to populated partition columns only based on given partition values.
    4. Refactored ParquetPartitionDescriptor to be responsible for populating partition values rather than storing this logic in parquet group scan class.
    5. Metadata class was moved to separate metadata package (org.apache.drill.exec.store.parquet.metadata). Factored out several inner classed to improve code readability.
    6. Collected all Drill native parquet reader unit tests into one class TestHiveDrillNativeParquetReader, also added new tests to cover new functionality.
    7. Reduced excessive logging when parquet files metadata is read.
    8. Added Drill stopwatch implementation (includes wrapper around Guava stopwatch and DummyStopwatch). This would help to save system resources when debug level is not enabled.
    
    Link to Jira - [DRILL-6331](https://issues.apache.org/jira/browse/DRILL-6331).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arina-ielchiieva/drill DRILL-6331

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1214
    
----
commit af5dff61b6b70c4ef70d4a5173aa63f5faa9c2c0
Author: Arina Ielchiieva <ar...@...>
Date:   2018-03-20T18:29:45Z

    DRILL-6331: Revisit Hive Drill native parquet implementation to be exposed to Drill optimizations (filter / limit push down, count to direct scan)
    
    1. Factored out common logic for Drill parquet reader and Hive Drill native parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, AbstractParquetScanBatchCreator.
    2. Rules that worked previously only with ParquetGroupScan, now can be applied for any class that extends AbstractParquetGroupScan: DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule.
    3. Hive populated partition values based on information returned from Hive metastore. Drill populates partition values based on path difference between selection root and actual file path.
       Before ColumnExplorer populated partition values based on Drill approach. Since now ColumnExplorer populates values for parquet files from Hive tables,
       `populateImplicitColumns` method logic was changed to populated partition columns only based on given partition values.
    4. Refactored ParquetPartitionDescriptor to be responsible for populating partition values rather than storing this logic in parquet group scan class.
    5. Metadata class was moved to separate metadata package (org.apache.drill.exec.store.parquet.metadata). Factored out several inner classed to improve code readability.
    6. Collected all Drill native parquet reader unit tests into one class TestHiveDrillNativeParquetReader, also added new tests to cover new functionality.
    7. Reduced excessive logging when parquet files metadata is read.
    8. Added Drill stopwatch implementation (includes wrapper around Guava stopwatch and DummyStopwatch). This would help to save system resources when debug level is not enabled.

----


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183566717
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    +
    +        Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
    +        filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
    +            materializedFilter, constantBoundaries, udfUtilities);
    +
    +        if (filterPredicate == null) {
    +          return null;
    +        }
    +      }
    +
    +      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
    +        continue;
    +      }
    +
    +      qualifiedRGs.add(rowGroup);
    +      qualifiedFilePath.add(rowGroup.getPath());  // TODO : optimize when 1 file contains m row groups.
    +    }
    +
    +    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyFilter does not have any pruning!");
    +      return null;
    +    } else if (qualifiedFilePath.size() == 0) {
    +      logger.debug("All rowgroups have been filtered out. Add back one to get schema from scannner");
    +      RowGroupInfo rg = rowGroupInfos.iterator().next();
    +      qualifiedFilePath.add(rg.getPath());
    +      qualifiedRGs.add(rg);
    +    }
    +
    +    logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
    +
    +    try {
    +      AbstractParquetGroupScan cloneGroupScan = cloneWithFileSelection(qualifiedFilePath);
    +      cloneGroupScan.rowGroupInfos = qualifiedRGs;
    +      cloneGroupScan.parquetGroupScanStatistics.collect(cloneGroupScan.rowGroupInfos, cloneGroupScan.parquetTableMetadata);
    +      return cloneGroupScan;
    +
    +    } catch (IOException e) {
    +      logger.warn("Could not apply filter prune due to Exception : {}", e);
    +      return null;
    +    }
    +  }
    +  // filter push down methods block end
    +
    +  // limit push down methods start
    +  @Override
    +  public boolean supportsLimitPushdown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public GroupScan applyLimit(int maxRecords) {
    +    maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
    +    // further optimization : minimize # of files chosen, or the affinity of files chosen.
    +
    +    // Calculate number of rowGroups to read based on maxRecords and update
    +    // number of records to read for each of those rowGroups.
    +    int index = updateRowGroupInfo(maxRecords);
    +
    +    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
    +        .map(ReadEntryWithPath::getPath)
    +        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
    +
    +    // If there is no change in fileSet, no need to create new groupScan.
    +    if (filePaths.size() == fileSet.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyLimit() does not apply!");
    +      return null;
    +    }
    +
    +    logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), filePaths.size());
    +
    +    try {
    +      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
    +      newScan.updateRowGroupInfo(maxRecords);
    +      return newScan;
    +    } catch (IOException e) {
    +      logger.warn("Could not apply rowcount based prune due to Exception : {}", e);
    +      return null;
    +    }
    +  }
    +  // limit push down methods end
    +
    +  // partition pruning methods start
    +  @Override
    +  public List<SchemaPath> getPartitionColumns() {
    +    return parquetGroupScanStatistics.getPartitionColumns();
    +  }
    +
    +  @JsonIgnore
    +  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
    +    return parquetGroupScanStatistics.getTypeForColumn(schemaPath);
    +  }
    +
    +  @JsonIgnore
    +  public <T> T getPartitionValue(String path, SchemaPath column, Class<T> clazz) {
    +    return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column));
    +  }
    +
    +  @JsonIgnore
    +  public Set<String> getFileSet() {
    +    return fileSet;
    +  }
    +  // partition pruning methods end
    +
    +  // helper method used for partition pruning and filter push down
    +  @Override
    +  public void modifyFileSelection(FileSelection selection) {
    +    List<String> files = selection.getFiles();
    +    fileSet = new HashSet<>(files);
    +    entries = new ArrayList<>(files.size());
    +
    +    entries.addAll(files.stream()
    +        .map(ReadEntryWithPath::new)
    +        .collect(Collectors.toList()));
    +
    +    rowGroupInfos = rowGroupInfos.stream()
    +        .filter(rowGroupInfo -> fileSet.contains(rowGroupInfo.getPath()))
    +        .collect(Collectors.toList());
    +  }
    +
    +
    +  // protected methods block
    +  protected void init() throws IOException {
    +    initInternal();
    +
    +    assert parquetTableMetadata != null;
    +
    +    if (fileSet == null) {
    +      fileSet = new HashSet<>();
    +      fileSet.addAll(parquetTableMetadata.getFiles().stream()
    +          .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath)
    +          .collect(Collectors.toList()));
    --- End diff --
    
    ```
          fileSet = parquetTableMetadata.getFiles().stream()
              .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath)
              .collect(Collectors.toSet());
    ```


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183980928
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java ---
    @@ -0,0 +1,247 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to you under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +package org.apache.drill.exec;
    +
    +import org.apache.drill.PlanTestBase;
    +import org.apache.drill.categories.HiveStorageTest;
    +import org.apache.drill.categories.SlowTest;
    +import org.apache.drill.common.exceptions.UserRemoteException;
    +import org.apache.drill.exec.hive.HiveTestBase;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.hamcrest.CoreMatchers;
    +import org.joda.time.DateTime;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.ExpectedException;
    +
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.Timestamp;
    +
    +import static org.hamcrest.CoreMatchers.containsString;
    +import static org.junit.Assert.assertEquals;
    +
    +@Category({SlowTest.class, HiveStorageTest.class})
    +public class TestHiveDrillNativeParquetReader extends HiveTestBase {
    +
    +  @BeforeClass
    +  public static void init() {
    +    setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, true);
    +    setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
    +  }
    +
    +  @AfterClass
    +  public static void cleanup() {
    +    resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
    +    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
    +  }
    +
    +  @Rule
    +  public ExpectedException thrown = ExpectedException.none();
    +
    +  @Test
    +  public void testFilterPushDownForManagedTable() throws Exception {
    +    String query = "select * from hive.kv_native where key > 1";
    +
    +    int actualRowCount = testSql(query);
    +    assertEquals("Expected and actual row count should match", 2, actualRowCount);
    +
    +    testPlanMatchingPatterns(query,
    +        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{});
    +  }
    +
    +  @Test
    +  public void testFilterPushDownForExternalTable() throws Exception {
    +    String query = "select * from hive.kv_native_ext where key = 1";
    +
    +    int actualRowCount = testSql(query);
    +    assertEquals("Expected and actual row count should match", 1, actualRowCount);
    +
    +    testPlanMatchingPatterns(query,
    +        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{});
    --- End diff --
    
    Agree, replaced with null. Not adding this method to avoid merging conflicts.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981543
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java ---
    @@ -40,31 +36,26 @@
     import com.fasterxml.jackson.annotation.JsonProperty;
     import com.fasterxml.jackson.annotation.JsonTypeName;
     import com.google.common.base.Preconditions;
    -import com.google.common.collect.Iterators;
    +import org.apache.hadoop.conf.Configuration;
     
     // Class containing information for reading a single parquet row group form HDFS
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183253252
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java ---
    @@ -40,31 +36,26 @@
     import com.fasterxml.jackson.annotation.JsonProperty;
     import com.fasterxml.jackson.annotation.JsonTypeName;
     import com.google.common.base.Preconditions;
    -import com.google.common.collect.Iterators;
    +import org.apache.hadoop.conf.Configuration;
     
     // Class containing information for reading a single parquet row group form HDFS
    --- End diff --
    
    form  - > from


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183647695
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java ---
    @@ -166,25 +171,43 @@ public boolean matches(RelOptRuleCall call) {
       @Override
       public void onMatch(RelOptRuleCall call) {
         try {
    -      final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
    +      final DrillScanRel hiveScanRel = call.rel(0);
           final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
     
           final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
           final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
     
           final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
    -      checkForUnsupportedDataTypes(hiveTable);
    +      final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
    +
    +      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
    --- End diff --
    
    line break


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183983326
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    +
    +        Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
    +        filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
    +            materializedFilter, constantBoundaries, udfUtilities);
    +
    +        if (filterPredicate == null) {
    +          return null;
    +        }
    +      }
    +
    +      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
    +        continue;
    +      }
    +
    +      qualifiedRGs.add(rowGroup);
    +      qualifiedFilePath.add(rowGroup.getPath());  // TODO : optimize when 1 file contains m row groups.
    +    }
    +
    +    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyFilter does not have any pruning!");
    +      return null;
    +    } else if (qualifiedFilePath.size() == 0) {
    +      logger.debug("All rowgroups have been filtered out. Add back one to get schema from scannner");
    +      RowGroupInfo rg = rowGroupInfos.iterator().next();
    +      qualifiedFilePath.add(rg.getPath());
    +      qualifiedRGs.add(rg);
    +    }
    +
    +    logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
    --- End diff --
    
    Changed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981425
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.google.common.base.Functions;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.common.Stopwatch;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.ops.ExecutorFragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
    +import org.apache.drill.exec.store.parquet2.DrillParquetReader;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.schema.Type;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractParquetScanBatchCreator {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
    +
    +  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
    +  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
    +  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
    +
    +  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
    +    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
    +
    +    if (!columnExplorer.isStarQuery()) {
    +      rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
    +      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
    +    }
    +
    +    boolean useAsyncPageReader =
    +        context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
    +
    +    AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, useAsyncPageReader);
    +
    +    // keep footers in a map to avoid re-reading them
    +    Map<String, ParquetMetadata> footers = new HashMap<>();
    +    List<RecordReader> readers = new LinkedList<>();
    +    List<Map<String, String>> implicitColumns = new ArrayList<>();
    +    Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
    +    for(RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
    +      /*
    +      Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
    +      TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
    +      we should add more information to the RowGroupInfo that will be populated upon the first read to
    +      provide the reader with all of th file meta-data it needs
    +      These fields will be added to the constructor below
    +      */
    +      try {
    +        Stopwatch timer = Stopwatch.createUnstarted(logger.isDebugEnabled());
    +        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
    +        if (!footers.containsKey(rowGroup.getPath())) {
    +          timer.start();
    +
    +          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath());
    +          long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
    +          logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
    +          footers.put(rowGroup.getPath(), footer);
    +        }
    +        ParquetMetadata footer = footers.get(rowGroup.getPath());
    +
    +        boolean autoCorrectCorruptDates = rowGroupScan.areCorruptDatesAutoCorrected();
    +        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(),
    --- End diff --
    
    Done.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183982099
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java ---
    @@ -0,0 +1,95 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to you under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.drill.exec.store.schedule.CompleteWork;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +
    +import java.util.List;
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
    +
    +public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
    +
    +    private EndpointByteMap byteMap;
    +    private int rowGroupIndex;
    +    private List<? extends ColumnMetadata> columns;
    +    private long rowCount;  // rowCount = -1 indicates to include all rows.
    +    private long numRecordsToRead;
    +
    +    @JsonCreator
    +    public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
    +                        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) {
    +      super(path, start, length);
    +      this.rowGroupIndex = rowGroupIndex;
    +      this.rowCount = rowCount;
    +      this.numRecordsToRead = rowCount;
    +    }
    +
    +    public RowGroupReadEntry getRowGroupReadEntry() {
    +      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(),
    +                                   this.rowGroupIndex, this.getNumRecordsToRead());
    +    }
    +
    +    public int getRowGroupIndex() {
    +      return this.rowGroupIndex;
    +    }
    +
    +    @Override
    +    public int compareTo(CompleteWork o) {
    +      return Long.compare(getTotalBytes(), o.getTotalBytes());
    +    }
    +
    +    @Override
    +    public long getTotalBytes() {
    +      return this.getLength();
    +    }
    +
    +    @Override
    +    public EndpointByteMap getByteMap() {
    +      return byteMap;
    +    }
    +
    +    public long getNumRecordsToRead() {
    +      return numRecordsToRead;
    +    }
    +
    +    public void setNumRecordsToRead(long numRecords) {
    +      numRecordsToRead = numRecords;
    +    }
    +
    +    public void setEndpointByteMap(EndpointByteMap byteMap) {
    +      this.byteMap = byteMap;
    +    }
    +
    +    public long getRowCount() {
    +      return rowCount;
    +    }
    +
    +    public List<? extends ColumnMetadata> getColumns() {
    +      return columns;
    +    }
    +
    +    public void setColumns(List<? extends ColumnMetadata> columns) {
    +      this.columns = columns;
    +    }
    +
    +  }
    --- End diff --
    
    Done.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183551693
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    --- End diff --
    
    Looks like it is useful logger. Uncomment?


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183909850
  
    --- Diff: common/src/main/java/org/apache/drill/common/Stopwatch.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.common;
    +
    +import com.google.common.base.Ticker;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Helper that creates stopwatch based if debug level is enabled.
    --- End diff --
    
    Do we really need this? In general we have (or should have) used Stopwatch to track metrics and or performance bottlenecks in production. In neither case do we want to enable debug.
    Also, for debugging performance issues (I see that the places you've changed to use this Stopwatch are places where we encountered performance issues), would it be better to use 
    ```
    Stopwatch timer;
    if(logger.isDebugEnabled()){
       timer = Stopwatch.createStarted();
    }
    ```
    More verbose, but guaranteed to be optimized away by the JVM.
    Not insisting that we change this, BTW.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981672
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java ---
    @@ -0,0 +1,95 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183983280
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    --- End diff --
    
    Done.


---

[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1214
  
    @parthchandra added two new commits:
    1. reverted custom Stopwatch implementation and used logger checks instead.
    2. allowed to create several non-tracking fs but only one tracking per operator context (details in prior discussion).


---

[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1214
  
    When moving files around please preserve the history of modifications done to the file.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183633517
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---
    @@ -156,43 +157,74 @@ public static boolean isPartitionColumn(String partitionDesignator, String path)
       }
     
       /**
    -   * Compares selection root and actual file path to determine partition columns values.
    -   * Adds implicit file columns according to columns list.
    +   * Creates map with implicit columns where key is column name, value is columns actual value.
    +   * This map contains partition and implicit file columns (if requested).
    +   * Partition columns names are formed based in partition designator and value index.
        *
    -   * @return map with columns names as keys and their values
    +   * @param filePath file path, used to populate file implicit columns
    +   * @param partitionValues list of partition values
    +   * @param includeFileImplicitColumns if file implicit columns should be included into the result
    +   * @return implicit columns map
        */
    -  public Map<String, String> populateImplicitColumns(FileWork work, String selectionRoot) {
    -    return populateImplicitColumns(work.getPath(), selectionRoot);
    -  }
    +  public Map<String, String> populateImplicitColumns(String filePath,
    +                                                     List<String> partitionValues,
    +                                                     boolean includeFileImplicitColumns) {
    +    Map<String, String> implicitValues = new LinkedHashMap<>();
     
    -  /**
    -   * Compares selection root and actual file path to determine partition columns values.
    -   * Adds implicit file columns according to columns list.
    -   *
    -   * @return map with columns names as keys and their values
    -   */
    -  public Map<String, String> populateImplicitColumns(String filePath, String selectionRoot) {
    -    Map<String, String> implicitValues = Maps.newLinkedHashMap();
    -    if (selectionRoot != null) {
    -      String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString().split("/");
    -      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
    -      String[] p = path.toString().split("/");
    -      if (p.length > r.length) {
    -        String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
    -        for (int a = 0; a < q.length; a++) {
    -          if (isStarQuery || selectedPartitionColumns.contains(a)) {
    -            implicitValues.put(partitionDesignator + a, q[a]);
    -          }
    -        }
    +    for(int i = 0; i < partitionValues.size(); i++) {
    --- End diff --
    
    `for (`


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981354
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.google.common.base.Functions;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.common.Stopwatch;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.ops.ExecutorFragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
    +import org.apache.drill.exec.store.parquet2.DrillParquetReader;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.schema.Type;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractParquetScanBatchCreator {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
    +
    +  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
    +  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
    +  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
    +
    +  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
    +    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
    +
    +    if (!columnExplorer.isStarQuery()) {
    +      rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
    +      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
    +    }
    +
    +    boolean useAsyncPageReader =
    +        context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
    +
    +    AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, useAsyncPageReader);
    +
    +    // keep footers in a map to avoid re-reading them
    +    Map<String, ParquetMetadata> footers = new HashMap<>();
    +    List<RecordReader> readers = new LinkedList<>();
    +    List<Map<String, String>> implicitColumns = new ArrayList<>();
    +    Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
    +    for(RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r184401600
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -158,25 +159,26 @@ public void close() {
         } catch (RuntimeException e) {
           ex = ex == null ? e : ex;
         }
    -    try {
    -      if (fs != null) {
    +
    +    for (DrillFileSystem fs : fileSystems) {
    +      try {
             fs.close();
    -        fs = null;
    -      }
    -    } catch (IOException e) {
    +      } catch (IOException e) {
           throw UserException.resourceError(e)
    -        .addContext("Failed to close the Drill file system for " + getName())
    -        .build(logger);
    +          .addContext("Failed to close the Drill file system for " + getName())
    +          .build(logger);
    +      }
         }
    +
         if (ex != null) {
           throw ex;
         }
       }
     
       @Override
       public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
    -    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
    -    fs = new DrillFileSystem(conf, getStats());
    +    DrillFileSystem fs = new DrillFileSystem(conf, getStats());
    --- End diff --
    
    When `AbstractParquetScanBatchCreator.getBatch` method is called, it receives one operator context which is used to allow to create only one file system. It also receives `AbstractParquetRowGroupScan` which contains several row groups. Row groups may belong to different files. For Drill parquet files, we create only one fs and use it for to create readers for each row group. That's why it was fine when operator context allowed to create only one fs. But we needed to adjust it for Hive files. For Hive we need to create fs for each file (since config to each file system is different and created using projection pusher), that's why I had to change operator context to allow more then one file system. I have also introduced `AbstractDrillFileSystemManager` which controls number of file systems created. `ParquetDrillFileSystemManager` creates only one (as was done before). `HiveDrillNativeParquetDrillFileSystemManager` creates fs for each file, so when two row groups belong to the same
  file, they will get the same fs.
    
    But I agree that for tracking fs (i.e. store.parquet.reader.pagereader.async is set to false) this will create mess in calculations. So I suggest the following fix, for Hive we'll always create non tracking fs, for Drill depending on store.parquet.reader.pagereader.async option. Also I'll add checks in operator context to disallow to create more then one tracking fs and to create tracking fs at all when non-tracking is / are already created.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183635975
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.google.common.base.Functions;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.common.Stopwatch;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.ops.ExecutorFragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
    +import org.apache.drill.exec.store.parquet2.DrillParquetReader;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.schema.Type;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractParquetScanBatchCreator {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
    +
    +  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
    +  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
    +  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
    +
    +  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
    +    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
    +
    +    if (!columnExplorer.isStarQuery()) {
    +      rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
    +      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
    +    }
    +
    +    boolean useAsyncPageReader =
    +        context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
    +
    +    AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, useAsyncPageReader);
    +
    +    // keep footers in a map to avoid re-reading them
    +    Map<String, ParquetMetadata> footers = new HashMap<>();
    +    List<RecordReader> readers = new LinkedList<>();
    +    List<Map<String, String>> implicitColumns = new ArrayList<>();
    +    Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
    +    for(RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
    +      /*
    +      Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
    +      TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
    +      we should add more information to the RowGroupInfo that will be populated upon the first read to
    +      provide the reader with all of th file meta-data it needs
    +      These fields will be added to the constructor below
    +      */
    +      try {
    +        Stopwatch timer = Stopwatch.createUnstarted(logger.isDebugEnabled());
    +        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
    +        if (!footers.containsKey(rowGroup.getPath())) {
    +          timer.start();
    +
    +          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath());
    +          long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
    +          logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
    +          footers.put(rowGroup.getPath(), footer);
    +        }
    +        ParquetMetadata footer = footers.get(rowGroup.getPath());
    +
    +        boolean autoCorrectCorruptDates = rowGroupScan.areCorruptDatesAutoCorrected();
    +        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(),
    --- End diff --
    
    new line


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981250
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---
    @@ -156,43 +157,74 @@ public static boolean isPartitionColumn(String partitionDesignator, String path)
       }
     
       /**
    -   * Compares selection root and actual file path to determine partition columns values.
    -   * Adds implicit file columns according to columns list.
    +   * Creates map with implicit columns where key is column name, value is columns actual value.
    +   * This map contains partition and implicit file columns (if requested).
    +   * Partition columns names are formed based in partition designator and value index.
        *
    -   * @return map with columns names as keys and their values
    +   * @param filePath file path, used to populate file implicit columns
    +   * @param partitionValues list of partition values
    +   * @param includeFileImplicitColumns if file implicit columns should be included into the result
    +   * @return implicit columns map
        */
    -  public Map<String, String> populateImplicitColumns(FileWork work, String selectionRoot) {
    -    return populateImplicitColumns(work.getPath(), selectionRoot);
    -  }
    +  public Map<String, String> populateImplicitColumns(String filePath,
    +                                                     List<String> partitionValues,
    +                                                     boolean includeFileImplicitColumns) {
    +    Map<String, String> implicitValues = new LinkedHashMap<>();
     
    -  /**
    -   * Compares selection root and actual file path to determine partition columns values.
    -   * Adds implicit file columns according to columns list.
    -   *
    -   * @return map with columns names as keys and their values
    -   */
    -  public Map<String, String> populateImplicitColumns(String filePath, String selectionRoot) {
    -    Map<String, String> implicitValues = Maps.newLinkedHashMap();
    -    if (selectionRoot != null) {
    -      String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString().split("/");
    -      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
    -      String[] p = path.toString().split("/");
    -      if (p.length > r.length) {
    -        String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
    -        for (int a = 0; a < q.length; a++) {
    -          if (isStarQuery || selectedPartitionColumns.contains(a)) {
    -            implicitValues.put(partitionDesignator + a, q[a]);
    -          }
    -        }
    +    for(int i = 0; i < partitionValues.size(); i++) {
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183251188
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java ---
    @@ -0,0 +1,95 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    indent


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183633688
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.google.common.base.Functions;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.common.Stopwatch;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.ops.ExecutorFragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
    +import org.apache.drill.exec.store.parquet2.DrillParquetReader;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.CodecFactory;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +import org.apache.parquet.schema.Type;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractParquetScanBatchCreator {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
    +
    +  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
    +  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
    +  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
    +
    +  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
    +    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
    +
    +    if (!columnExplorer.isStarQuery()) {
    +      rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
    +      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
    +    }
    +
    +    boolean useAsyncPageReader =
    +        context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
    +
    +    AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, useAsyncPageReader);
    +
    +    // keep footers in a map to avoid re-reading them
    +    Map<String, ParquetMetadata> footers = new HashMap<>();
    +    List<RecordReader> readers = new LinkedList<>();
    +    List<Map<String, String>> implicitColumns = new ArrayList<>();
    +    Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
    +    for(RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
    --- End diff --
    
    `for (`


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183980574
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java ---
    @@ -1,114 +1,223 @@
     /*
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981138
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---
    @@ -64,16 +68,17 @@ public static synchronized HiveTestDataGenerator getInstance(File baseDir) throw
           final String dbDir = dbDirFile.getAbsolutePath();
           final String whDir = whDirFile.getAbsolutePath();
     
    -      instance = new HiveTestDataGenerator(dbDir, whDir);
    +      instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
           instance.generateTestData();
         }
     
         return instance;
       }
     
    -  private HiveTestDataGenerator(final String dbDir, final String whDir) {
    +  private HiveTestDataGenerator(final String dbDir, final String whDir, final BaseDirTestWatcher dirTestWatcher) {
         this.dbDir = dbDir;
         this.whDir = whDir;
    +    this.dirTestWatcher = dirTestWatcher;
     
         config = Maps.newHashMap();
         config.put("hive.metastore.uris", "");
    --- End diff --
    
    Replaced.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183919198
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -158,25 +159,26 @@ public void close() {
         } catch (RuntimeException e) {
           ex = ex == null ? e : ex;
         }
    -    try {
    -      if (fs != null) {
    +
    +    for (DrillFileSystem fs : fileSystems) {
    +      try {
             fs.close();
    -        fs = null;
    -      }
    -    } catch (IOException e) {
    +      } catch (IOException e) {
           throw UserException.resourceError(e)
    -        .addContext("Failed to close the Drill file system for " + getName())
    -        .build(logger);
    +          .addContext("Failed to close the Drill file system for " + getName())
    +          .build(logger);
    +      }
         }
    +
         if (ex != null) {
           throw ex;
         }
       }
     
       @Override
       public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
    -    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
    -    fs = new DrillFileSystem(conf, getStats());
    +    DrillFileSystem fs = new DrillFileSystem(conf, getStats());
    --- End diff --
    
    I don't get why you need multiple DrillFileSystems per operator context? The reason for the DrillFileSystem abstraction (and the reason for tying it to the operator context) is to track the time a (scan) operator was waiting for a file system call to return. This is reported in the wait time for the operator in the query profile.  For scans this is a critical number as the time spent waiting for a disk read determines if the query is disk bound.
    Associating multiple file system objects with a single operator context will throw the math out of whack. I think.



---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183644277
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.parquet.schema.OriginalType;
    +import org.apache.parquet.schema.PrimitiveType;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
    +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
    +
    +/**
    + * Holds common statistics about data in parquet group scan,
    + * including information about total row count, columns counts, partition columns.
    + */
    +public class ParquetGroupScanStatistics {
    +
    +  // map from file names to maps of column name to partition value mappings
    +  private Map<String, Map<SchemaPath, Object>> partitionValueMap;
    +  // only for partition columns : value is unique for each partition
    +  private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap;
    +  // total number of non-null value for each column in parquet files
    +  private Map<SchemaPath, Long> columnValueCounts;
    +  // total number of rows (obtained from parquet footer)
    +  private long rowCount;
    +
    +
    +  public ParquetGroupScanStatistics(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) {
    +    collect(rowGroupInfos, parquetTableMetadata);
    +  }
    +
    +  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
    +    this.partitionValueMap = new HashMap<>(that.partitionValueMap);
    +    this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
    +    this.columnValueCounts = new HashMap<>(that.columnValueCounts);
    +    this.rowCount = that.rowCount;
    +  }
    +
    +  public long getColumnValueCount(SchemaPath column) {
    +    return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0;
    +  }
    +
    +  public List<SchemaPath> getPartitionColumns() {
    +    return new ArrayList<>(partitionColTypeMap.keySet());
    +  }
    +
    +  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
    +    return partitionColTypeMap.get(schemaPath);
    +  }
    +
    +  public long getRowCount() {
    +    return rowCount;
    +  }
    +
    +  public Object getPartitionValue(String path, SchemaPath column) {
    +    return partitionValueMap.get(path).get(column);
    +  }
    +
    +  public void collect(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) {
    +    resetHolders();
    +    boolean first = true;
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      long rowCount = rowGroup.getRowCount();
    +      for (ColumnMetadata column : rowGroup.getColumns()) {
    +        SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
    +        Long previousCount = columnValueCounts.get(schemaPath);
    +        if (previousCount != null) {
    +          if (previousCount != GroupScan.NO_COLUMN_STATS) {
    +            if (column.getNulls() != null) {
    --- End diff --
    
    Combine if statement with above.



---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183983380
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    +
    +        Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
    +        filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
    +            materializedFilter, constantBoundaries, udfUtilities);
    +
    +        if (filterPredicate == null) {
    +          return null;
    +        }
    +      }
    +
    +      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
    +        continue;
    +      }
    +
    +      qualifiedRGs.add(rowGroup);
    +      qualifiedFilePath.add(rowGroup.getPath());  // TODO : optimize when 1 file contains m row groups.
    +    }
    +
    +    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyFilter does not have any pruning!");
    +      return null;
    +    } else if (qualifiedFilePath.size() == 0) {
    +      logger.debug("All rowgroups have been filtered out. Add back one to get schema from scannner");
    +      RowGroupInfo rg = rowGroupInfos.iterator().next();
    +      qualifiedFilePath.add(rg.getPath());
    +      qualifiedRGs.add(rg);
    +    }
    +
    +    logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
    +
    +    try {
    +      AbstractParquetGroupScan cloneGroupScan = cloneWithFileSelection(qualifiedFilePath);
    +      cloneGroupScan.rowGroupInfos = qualifiedRGs;
    +      cloneGroupScan.parquetGroupScanStatistics.collect(cloneGroupScan.rowGroupInfos, cloneGroupScan.parquetTableMetadata);
    +      return cloneGroupScan;
    +
    +    } catch (IOException e) {
    +      logger.warn("Could not apply filter prune due to Exception : {}", e);
    +      return null;
    +    }
    +  }
    +  // filter push down methods block end
    +
    +  // limit push down methods start
    +  @Override
    +  public boolean supportsLimitPushdown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public GroupScan applyLimit(int maxRecords) {
    +    maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
    +    // further optimization : minimize # of files chosen, or the affinity of files chosen.
    +
    +    // Calculate number of rowGroups to read based on maxRecords and update
    +    // number of records to read for each of those rowGroups.
    +    int index = updateRowGroupInfo(maxRecords);
    +
    +    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
    +        .map(ReadEntryWithPath::getPath)
    +        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
    +
    +    // If there is no change in fileSet, no need to create new groupScan.
    +    if (filePaths.size() == fileSet.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyLimit() does not apply!");
    +      return null;
    +    }
    +
    +    logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), filePaths.size());
    +
    +    try {
    +      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
    +      newScan.updateRowGroupInfo(maxRecords);
    +      return newScan;
    +    } catch (IOException e) {
    +      logger.warn("Could not apply rowcount based prune due to Exception : {}", e);
    +      return null;
    +    }
    +  }
    +  // limit push down methods end
    +
    +  // partition pruning methods start
    +  @Override
    +  public List<SchemaPath> getPartitionColumns() {
    +    return parquetGroupScanStatistics.getPartitionColumns();
    +  }
    +
    +  @JsonIgnore
    +  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
    +    return parquetGroupScanStatistics.getTypeForColumn(schemaPath);
    +  }
    +
    +  @JsonIgnore
    +  public <T> T getPartitionValue(String path, SchemaPath column, Class<T> clazz) {
    +    return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column));
    +  }
    +
    +  @JsonIgnore
    +  public Set<String> getFileSet() {
    +    return fileSet;
    +  }
    +  // partition pruning methods end
    +
    +  // helper method used for partition pruning and filter push down
    +  @Override
    +  public void modifyFileSelection(FileSelection selection) {
    +    List<String> files = selection.getFiles();
    +    fileSet = new HashSet<>(files);
    +    entries = new ArrayList<>(files.size());
    +
    +    entries.addAll(files.stream()
    +        .map(ReadEntryWithPath::new)
    +        .collect(Collectors.toList()));
    +
    +    rowGroupInfos = rowGroupInfos.stream()
    +        .filter(rowGroupInfo -> fileSet.contains(rowGroupInfo.getPath()))
    +        .collect(Collectors.toList());
    +  }
    +
    +
    +  // protected methods block
    +  protected void init() throws IOException {
    +    initInternal();
    +
    +    assert parquetTableMetadata != null;
    +
    +    if (fileSet == null) {
    +      fileSet = new HashSet<>();
    +      fileSet.addAll(parquetTableMetadata.getFiles().stream()
    +          .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath)
    +          .collect(Collectors.toList()));
    --- End diff --
    
    Agree, fixed.


---

[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1214
  
    @vdiravka / @parthchandra please review.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981479
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.parquet.schema.OriginalType;
    +import org.apache.parquet.schema.PrimitiveType;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
    +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
    +
    +/**
    + * Holds common statistics about data in parquet group scan,
    + * including information about total row count, columns counts, partition columns.
    + */
    +public class ParquetGroupScanStatistics {
    +
    +  // map from file names to maps of column name to partition value mappings
    +  private Map<String, Map<SchemaPath, Object>> partitionValueMap;
    +  // only for partition columns : value is unique for each partition
    +  private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap;
    +  // total number of non-null value for each column in parquet files
    +  private Map<SchemaPath, Long> columnValueCounts;
    +  // total number of rows (obtained from parquet footer)
    +  private long rowCount;
    +
    +
    +  public ParquetGroupScanStatistics(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) {
    +    collect(rowGroupInfos, parquetTableMetadata);
    +  }
    +
    +  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
    +    this.partitionValueMap = new HashMap<>(that.partitionValueMap);
    +    this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
    +    this.columnValueCounts = new HashMap<>(that.columnValueCounts);
    +    this.rowCount = that.rowCount;
    +  }
    +
    +  public long getColumnValueCount(SchemaPath column) {
    +    return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0;
    +  }
    +
    +  public List<SchemaPath> getPartitionColumns() {
    +    return new ArrayList<>(partitionColTypeMap.keySet());
    +  }
    +
    +  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
    +    return partitionColTypeMap.get(schemaPath);
    +  }
    +
    +  public long getRowCount() {
    +    return rowCount;
    +  }
    +
    +  public Object getPartitionValue(String path, SchemaPath column) {
    +    return partitionValueMap.get(path).get(column);
    +  }
    +
    +  public void collect(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) {
    +    resetHolders();
    +    boolean first = true;
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      long rowCount = rowGroup.getRowCount();
    +      for (ColumnMetadata column : rowGroup.getColumns()) {
    +        SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
    +        Long previousCount = columnValueCounts.get(schemaPath);
    +        if (previousCount != null) {
    +          if (previousCount != GroupScan.NO_COLUMN_STATS) {
    +            if (column.getNulls() != null) {
    --- End diff --
    
    Changed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183633623
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---
    @@ -147,10 +147,12 @@ CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) t
         List<RecordReader> readers = new LinkedList<>();
         List<Map<String, String>> implicitColumns = Lists.newArrayList();
         Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
    +    boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
         for(FileWork work : scan.getWorkUnits()){
    --- End diff --
    
    `for (`


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183251213
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java ---
    @@ -0,0 +1,95 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to you under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.drill.exec.store.schedule.CompleteWork;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +
    +import java.util.List;
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
    +
    +public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
    +
    +    private EndpointByteMap byteMap;
    +    private int rowGroupIndex;
    +    private List<? extends ColumnMetadata> columns;
    +    private long rowCount;  // rowCount = -1 indicates to include all rows.
    +    private long numRecordsToRead;
    +
    +    @JsonCreator
    +    public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
    +                        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) {
    +      super(path, start, length);
    +      this.rowGroupIndex = rowGroupIndex;
    +      this.rowCount = rowCount;
    +      this.numRecordsToRead = rowCount;
    +    }
    +
    +    public RowGroupReadEntry getRowGroupReadEntry() {
    +      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(),
    +                                   this.rowGroupIndex, this.getNumRecordsToRead());
    +    }
    +
    +    public int getRowGroupIndex() {
    +      return this.rowGroupIndex;
    +    }
    +
    +    @Override
    +    public int compareTo(CompleteWork o) {
    +      return Long.compare(getTotalBytes(), o.getTotalBytes());
    +    }
    +
    +    @Override
    +    public long getTotalBytes() {
    +      return this.getLength();
    +    }
    +
    +    @Override
    +    public EndpointByteMap getByteMap() {
    +      return byteMap;
    +    }
    +
    +    public long getNumRecordsToRead() {
    +      return numRecordsToRead;
    +    }
    +
    +    public void setNumRecordsToRead(long numRecords) {
    +      numRecordsToRead = numRecords;
    +    }
    +
    +    public void setEndpointByteMap(EndpointByteMap byteMap) {
    +      this.byteMap = byteMap;
    +    }
    +
    +    public long getRowCount() {
    +      return rowCount;
    +    }
    +
    +    public List<? extends ColumnMetadata> getColumns() {
    +      return columns;
    +    }
    +
    +    public void setColumns(List<? extends ColumnMetadata> columns) {
    +      this.columns = columns;
    +    }
    +
    +  }
    --- End diff --
    
    new line


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r184199682
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -158,25 +159,26 @@ public void close() {
         } catch (RuntimeException e) {
           ex = ex == null ? e : ex;
         }
    -    try {
    -      if (fs != null) {
    +
    +    for (DrillFileSystem fs : fileSystems) {
    +      try {
             fs.close();
    -        fs = null;
    -      }
    -    } catch (IOException e) {
    +      } catch (IOException e) {
           throw UserException.resourceError(e)
    -        .addContext("Failed to close the Drill file system for " + getName())
    -        .build(logger);
    +          .addContext("Failed to close the Drill file system for " + getName())
    +          .build(logger);
    +      }
         }
    +
         if (ex != null) {
           throw ex;
         }
       }
     
       @Override
       public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
    -    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
    -    fs = new DrillFileSystem(conf, getStats());
    +    DrillFileSystem fs = new DrillFileSystem(conf, getStats());
    --- End diff --
    
    I'm not suggesting we use the same fs for each split, but the opposite. The fs obect used per split/rowgroup should be different so that we get the right fs wait time for every minor fragment. But this change allows more than one fs object per operator context; which we were explicitly preventing earlier. I'm not sure I understand why you needed to change that.
    



---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183250175
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java ---
    @@ -1,114 +1,223 @@
     /*
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    indent


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183644581
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java ---
    @@ -0,0 +1,247 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    indent


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183981301
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---
    @@ -147,10 +147,12 @@ CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) t
         List<RecordReader> readers = new LinkedList<>();
         List<Map<String, String>> implicitColumns = Lists.newArrayList();
         Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
    +    boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
         for(FileWork work : scan.getWorkUnits()){
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183253923
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java ---
    @@ -0,0 +1,130 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    indent


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183980648
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java ---
    @@ -0,0 +1,247 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r184188427
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -158,25 +159,26 @@ public void close() {
         } catch (RuntimeException e) {
           ex = ex == null ? e : ex;
         }
    -    try {
    -      if (fs != null) {
    +
    +    for (DrillFileSystem fs : fileSystems) {
    +      try {
             fs.close();
    -        fs = null;
    -      }
    -    } catch (IOException e) {
    +      } catch (IOException e) {
           throw UserException.resourceError(e)
    -        .addContext("Failed to close the Drill file system for " + getName())
    -        .build(logger);
    +          .addContext("Failed to close the Drill file system for " + getName())
    +          .build(logger);
    +      }
         }
    +
         if (ex != null) {
           throw ex;
         }
       }
     
       @Override
       public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
    -    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
    -    fs = new DrillFileSystem(conf, getStats());
    +    DrillFileSystem fs = new DrillFileSystem(conf, getStats());
    --- End diff --
    
    @parthchandra this is definitely a good question. I did so because in previous code new fs was created for each Hive table split [1]. Projection pusher is used to define fs for each split, it resolves path for table partitions. Frankly saying it worked fine for me without it (all tests have passed) but in Hive code the same approach is used and apparently for the same reasons it was used in Drill. To be safe, I have done the same. If you think we can the same fs for each row group in Hive, then I can adjust the changes.
    
    [1] https://github.com/apache/drill/blob/master/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java#L112


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183980496
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java ---
    @@ -166,25 +171,43 @@ public boolean matches(RelOptRuleCall call) {
       @Override
       public void onMatch(RelOptRuleCall call) {
         try {
    -      final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
    +      final DrillScanRel hiveScanRel = call.rel(0);
           final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
     
           final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
           final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
     
           final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
    -      checkForUnsupportedDataTypes(hiveTable);
    +      final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
    +
    +      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183646149
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java ---
    @@ -0,0 +1,247 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to you under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +package org.apache.drill.exec;
    +
    +import org.apache.drill.PlanTestBase;
    +import org.apache.drill.categories.HiveStorageTest;
    +import org.apache.drill.categories.SlowTest;
    +import org.apache.drill.common.exceptions.UserRemoteException;
    +import org.apache.drill.exec.hive.HiveTestBase;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.hamcrest.CoreMatchers;
    +import org.joda.time.DateTime;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.ExpectedException;
    +
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.Timestamp;
    +
    +import static org.hamcrest.CoreMatchers.containsString;
    +import static org.junit.Assert.assertEquals;
    +
    +@Category({SlowTest.class, HiveStorageTest.class})
    +public class TestHiveDrillNativeParquetReader extends HiveTestBase {
    +
    +  @BeforeClass
    +  public static void init() {
    +    setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, true);
    +    setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
    +  }
    +
    +  @AfterClass
    +  public static void cleanup() {
    +    resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
    +    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
    +  }
    +
    +  @Rule
    +  public ExpectedException thrown = ExpectedException.none();
    +
    +  @Test
    +  public void testFilterPushDownForManagedTable() throws Exception {
    +    String query = "select * from hive.kv_native where key > 1";
    +
    +    int actualRowCount = testSql(query);
    +    assertEquals("Expected and actual row count should match", 2, actualRowCount);
    +
    +    testPlanMatchingPatterns(query,
    +        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{});
    +  }
    +
    +  @Test
    +  public void testFilterPushDownForExternalTable() throws Exception {
    +    String query = "select * from hive.kv_native_ext where key = 1";
    +
    +    int actualRowCount = testSql(query);
    +    assertEquals("Expected and actual row count should match", 1, actualRowCount);
    +
    +    testPlanMatchingPatterns(query,
    +        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{});
    --- End diff --
    
    I have added method without `excludedPatterns`, when it is not necessary. But it is not merged for now.
    Is it better to pass null, than to create empty String?


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1214


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183632379
  
    --- Diff: contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---
    @@ -64,16 +68,17 @@ public static synchronized HiveTestDataGenerator getInstance(File baseDir) throw
           final String dbDir = dbDirFile.getAbsolutePath();
           final String whDir = whDirFile.getAbsolutePath();
     
    -      instance = new HiveTestDataGenerator(dbDir, whDir);
    +      instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
           instance.generateTestData();
         }
     
         return instance;
       }
     
    -  private HiveTestDataGenerator(final String dbDir, final String whDir) {
    +  private HiveTestDataGenerator(final String dbDir, final String whDir, final BaseDirTestWatcher dirTestWatcher) {
         this.dbDir = dbDir;
         this.whDir = whDir;
    +    this.dirTestWatcher = dirTestWatcher;
     
         config = Maps.newHashMap();
         config.put("hive.metastore.uris", "");
    --- End diff --
    
    "hive.metastore.uris" -> ConfVars.METASTOREURIS


---

[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1214
  
    @vdiravka thanks for the code review! Addressed code review comment.


---

[GitHub] drill issue #1214: DRILL-6331: Revisit Hive Drill native parquet implementat...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the issue:

    https://github.com/apache/drill/pull/1214
  
    Looks Good. Thanks for making the changes Arina.
    +1


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183980539
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java ---
    @@ -0,0 +1,130 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

Posted by vdiravka <gi...@git.apache.org>.
Github user vdiravka commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1214#discussion_r183558185
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet;
    +
    +import com.fasterxml.jackson.annotation.JsonIgnore;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.ListMultimap;
    +import org.apache.drill.common.expression.ErrorCollector;
    +import org.apache.drill.common.expression.ErrorCollectorImpl;
    +import org.apache.drill.common.expression.ExpressionStringBuilder;
    +import org.apache.drill.common.expression.LogicalExpression;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.expression.ValueExpressions;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
    +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
    +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    +import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
    +import org.apache.drill.exec.ops.UdfUtilities;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
    +import org.apache.drill.exec.physical.base.GroupScan;
    +import org.apache.drill.exec.physical.base.ScanStats;
    +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
    +import org.apache.drill.exec.planner.physical.PlannerSettings;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.ColumnExplorer;
    +import org.apache.drill.exec.store.dfs.FileSelection;
    +import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
    +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
    +import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
    +import org.apache.drill.exec.store.schedule.AffinityCreator;
    +import org.apache.drill.exec.store.schedule.AssignmentCreator;
    +import org.apache.drill.exec.store.schedule.EndpointByteMap;
    +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
    +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
    +
    +public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
    +
    +  protected List<SchemaPath> columns;
    +  protected List<ReadEntryWithPath> entries;
    +  protected LogicalExpression filter;
    +
    +  protected ParquetTableMetadataBase parquetTableMetadata;
    +  protected List<RowGroupInfo> rowGroupInfos;
    +  protected ListMultimap<Integer, RowGroupInfo> mappings;
    +  protected Set<String> fileSet;
    +
    +  private List<EndpointAffinity> endpointAffinities;
    +  private ParquetGroupScanStatistics parquetGroupScanStatistics;
    +
    +  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
    +    super(userName);
    +    this.columns = columns;
    +    this.entries = entries;
    +    this.filter = filter;
    +  }
    +
    +  // immutable copy constructor
    +  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
    +    super(that);
    +    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
    +    this.parquetTableMetadata = that.parquetTableMetadata;
    +    this.rowGroupInfos = that.rowGroupInfos == null ? null : new ArrayList<>(that.rowGroupInfos);
    +    this.filter = that.filter;
    +    this.endpointAffinities = that.endpointAffinities == null ? null : new ArrayList<>(that.endpointAffinities);
    +    this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings);
    +    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
    +    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
    +    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
    +  }
    +
    +  @JsonProperty
    +  public List<SchemaPath> getColumns() {
    +    return columns;
    +  }
    +
    +  @JsonProperty
    +  public List<ReadEntryWithPath> getEntries() {
    +    return entries;
    +  }
    +
    +  @JsonIgnore
    +  @Override
    +  public Collection<String> getFiles() {
    +    return fileSet;
    +  }
    +
    +  @Override
    +  public boolean hasFiles() {
    +    return true;
    +  }
    +
    +  @Override
    +  public boolean canPushdownProjects(List<SchemaPath> columns) {
    +    return true;
    +  }
    +
    +  /**
    +   * Return column value count for the specified column.
    +   * If does not contain such column, return 0.
    +   * Is used when applying convert to direct scan rule.
    +   *
    +   * @param column column schema path
    +   * @return column value count
    +   */
    +  @Override
    +  public long getColumnValueCount(SchemaPath column) {
    +    return parquetGroupScanStatistics.getColumnValueCount(column);
    +  }
    +
    +  /**
    +   * Calculates the affinity each endpoint has for this scan,
    +   * by adding up the affinity each endpoint has for each rowGroup.
    +   *
    +   * @return a list of EndpointAffinity objects
    +   */
    +  @Override
    +  public List<EndpointAffinity> getOperatorAffinity() {
    +    return endpointAffinities;
    +  }
    +
    +  @Override
    +  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> incomingEndpoints) {
    +    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
    +  }
    +
    +  @Override
    +  public int getMaxParallelizationWidth() {
    +    return rowGroupInfos.size();
    +  }
    +
    +  @Override
    +  public String getDigest() {
    +    return toString();
    +  }
    +
    +  @Override
    +  public ScanStats getScanStats() {
    +    int columnCount = columns == null ? 20 : columns.size();
    +    long rowCount = parquetGroupScanStatistics.getRowCount();
    +    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount);
    +    logger.trace("Drill parquet scan statistics: {}", scanStats);
    +    return scanStats;
    +  }
    +
    +  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
    +    assert minorFragmentId < mappings.size() : String
    +        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
    +            mappings.size(), minorFragmentId);
    +
    +    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
    +
    +    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
    +        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
    +
    +    List<RowGroupReadEntry> entries = new ArrayList<>();
    +    for (RowGroupInfo rgi : rowGroupsForMinor) {
    +      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead());
    +      entries.add(entry);
    +    }
    +    return entries;
    +  }
    +
    +  // filter push down methods block start
    +  @JsonProperty
    +  @Override
    +  public LogicalExpression getFilter() {
    +    return filter;
    +  }
    +
    +  public void setFilter(LogicalExpression filter) {
    +    this.filter = filter;
    +  }
    +
    +  @Override
    +  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
    +                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
    +
    +    if (rowGroupInfos.size() == 1 ||
    +        ! (parquetTableMetadata.isRowGroupPrunable()) ||
    +        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
    +        ) {
    +      // Stop pruning for 3 cases:
    +      //    -  1 single parquet file,
    +      //    -  metadata does not have proper format to support row group level filter pruning,
    +      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
    +      return null;
    +    }
    +
    +    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
    +
    +    final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
    +    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
    +
    +    ParquetFilterPredicate filterPredicate = null;
    +
    +    for (RowGroupInfo rowGroup : rowGroupInfos) {
    +      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
    +      List<String> partitionValues = getPartitionValues(rowGroup);
    +      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, supportsFileImplicitColumns());
    +
    +      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
    +          parquetTableMetadata,
    +          rowGroup.getColumns(),
    +          implicitColValues);
    +
    +      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
    +
    +      if (filterPredicate == null) {
    +        ErrorCollector errorCollector = new ErrorCollectorImpl();
    +        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
    +            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
    +
    +        if (errorCollector.hasErrors()) {
    +          logger.error("{} error(s) encountered when materialize filter expression : {}",
    +              errorCollector.getErrorCount(), errorCollector.toErrorString());
    +          return null;
    +        }
    +        //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
    +
    +        Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
    +        filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
    +            materializedFilter, constantBoundaries, udfUtilities);
    +
    +        if (filterPredicate == null) {
    +          return null;
    +        }
    +      }
    +
    +      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
    +        continue;
    +      }
    +
    +      qualifiedRGs.add(rowGroup);
    +      qualifiedFilePath.add(rowGroup.getPath());  // TODO : optimize when 1 file contains m row groups.
    +    }
    +
    +    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
    +      // There is no reduction of rowGroups. Return the original groupScan.
    +      logger.debug("applyFilter does not have any pruning!");
    +      return null;
    +    } else if (qualifiedFilePath.size() == 0) {
    +      logger.debug("All rowgroups have been filtered out. Add back one to get schema from scannner");
    +      RowGroupInfo rg = rowGroupInfos.iterator().next();
    +      qualifiedFilePath.add(rg.getPath());
    +      qualifiedRGs.add(rg);
    +    }
    +
    +    logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
    --- End diff --
    
    line break
    logger.debug?


---