You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/11/13 12:07:25 UTC
[11/11] drill git commit: DRILL-5795: Parquet Filter push down now
work at rowgroup level
DRILL-5795: Parquet Filter push down now work at rowgroup level
Before this commit, the filter was pruning complete files. When a file
is composed of multiple rowgroups, it was not able to prune one
rowgroup from the file. Now, when the filter find that a rowgroup
doesn't match it will be remove from the scan.
closes #949
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3036d370
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3036d370
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3036d370
Branch: refs/heads/master
Commit: 3036d3700aa620bbbffc260e52f633cdaae1172c
Parents: 30da051
Author: Damien Profeta <da...@amadeus.com>
Authored: Fri Sep 15 11:01:58 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Nov 13 11:45:21 2017 +0200
----------------------------------------------------------------------
.../exec/store/parquet/ParquetGroupScan.java | 171 ++++++++++---------
.../parquet/TestParquetFilterPushDown.java | 10 ++
.../resources/parquet/multirowgroup.parquet | Bin 0 -> 398 bytes
3 files changed, 103 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 4e38ce9..972332c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -738,7 +738,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private EndpointByteMap byteMap;
private int rowGroupIndex;
- private String root;
+ private List<? extends ColumnMetadata> columns;
private long rowCount; // rowCount = -1 indicates to include all rows.
private long numRecordsToRead;
@@ -791,6 +791,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return rowCount;
}
+ public List<? extends ColumnMetadata> getColumns() {
+ return columns;
+ }
+
+ public void setColumns(List<? extends ColumnMetadata> columns) {
+ this.columns = columns;
+ }
+
}
/**
@@ -962,69 +970,70 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
rowGroupInfo.setEndpointByteMap(endpointByteMap);
+ rowGroupInfo.setColumns(rg.getColumns());
rgIndex++;
rowGroupInfos.add(rowGroupInfo);
}
}
this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+ updatePartitionColTypeMap();
+ }
+ private void updatePartitionColTypeMap() {
columnValueCounts = Maps.newHashMap();
this.rowCount = 0;
boolean first = true;
- for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
- for (RowGroupMetadata rowGroup : file.getRowGroups()) {
- long rowCount = rowGroup.getRowCount();
- for (ColumnMetadata column : rowGroup.getColumns()) {
- SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
- Long previousCount = columnValueCounts.get(schemaPath);
- if (previousCount != null) {
- if (previousCount != GroupScan.NO_COLUMN_STATS) {
- if (column.getNulls() != null) {
- Long newCount = rowCount - column.getNulls();
- columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
- }
- }
- } else {
+ for (RowGroupInfo rowGroup : this.rowGroupInfos) {
+ long rowCount = rowGroup.getRowCount();
+ for (ColumnMetadata column : rowGroup.getColumns()) {
+ SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
+ Long previousCount = columnValueCounts.get(schemaPath);
+ if (previousCount != null) {
+ if (previousCount != GroupScan.NO_COLUMN_STATS) {
if (column.getNulls() != null) {
Long newCount = rowCount - column.getNulls();
- columnValueCounts.put(schemaPath, newCount);
- } else {
- columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+ columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
}
}
- boolean partitionColumn = checkForPartitionColumn(column, first, rowCount);
- if (partitionColumn) {
- Map<SchemaPath, Object> map = partitionValueMap.get(file.getPath());
- if (map == null) {
- map = Maps.newHashMap();
- partitionValueMap.put(file.getPath(), map);
+ } else {
+ if (column.getNulls() != null) {
+ Long newCount = rowCount - column.getNulls();
+ columnValueCounts.put(schemaPath, newCount);
+ } else {
+ columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+ }
+ }
+ boolean partitionColumn = checkForPartitionColumn(column, first, rowCount);
+ if (partitionColumn) {
+ Map<SchemaPath, Object> map = partitionValueMap.get(rowGroup.getPath());
+ if (map == null) {
+ map = Maps.newHashMap();
+ partitionValueMap.put(rowGroup.getPath(), map);
+ }
+ Object value = map.get(schemaPath);
+ Object currentValue = column.getMaxValue();
+ if (value != null) {
+ if (value != currentValue) {
+ partitionColTypeMap.remove(schemaPath);
}
- Object value = map.get(schemaPath);
- Object currentValue = column.getMaxValue();
- if (value != null) {
- if (value != currentValue) {
- partitionColTypeMap.remove(schemaPath);
- }
+ } else {
+ // the value of a column with primitive type can not be null,
+ // so checks that there are really null value and puts it to the map
+ if (rowCount == column.getNulls()) {
+ map.put(schemaPath, null);
} else {
- // the value of a column with primitive type can not be null,
- // so checks that there are really null value and puts it to the map
- if (rowCount == column.getNulls()) {
- map.put(schemaPath, null);
- } else {
- map.put(schemaPath, currentValue);
- }
+ map.put(schemaPath, currentValue);
}
- } else {
- partitionColTypeMap.remove(schemaPath);
}
+ } else {
+ partitionColTypeMap.remove(schemaPath);
}
- this.rowCount += rowGroup.getRowCount();
- first = false;
}
+ this.rowCount += rowGroup.getRowCount();
+ first = false;
}
}
-
private ParquetTableMetadataBase removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) {
List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList();
for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
@@ -1121,6 +1130,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return "ParquetGroupScan [entries=" + entries
+ ", selectionRoot=" + selectionRoot
+ ", numFiles=" + getEntries().size()
+ + ", numRowGroups=" + rowGroupInfos.size()
+ ", usedMetadataFile=" + usedMetadataCache
+ filterStr
+ cacheFileString
@@ -1231,7 +1241,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
- if (fileSet.size() == 1 ||
+ if (rowGroupInfos.size() == 1 ||
! (parquetTableMetadata.isRowGroupPrunable()) ||
rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
) {
@@ -1244,66 +1254,71 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
- final List<RowGroupMetadata> qualifiedRGs = new ArrayList<>(parquetTableMetadata.getFiles().size());
+ final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
Set<String> qualifiedFileNames = Sets.newHashSet(); // HashSet keeps a fileName unique.
ParquetFilterPredicate filterPredicate = null;
- for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+ for (RowGroupInfo rowGroup : rowGroupInfos) {
final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, this.columns);
- Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot);
+ Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), selectionRoot);
- for (RowGroupMetadata rowGroup : file.getRowGroups()) {
- ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
- parquetTableMetadata,
- rowGroup.getColumns(),
- implicitColValues);
+ ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+ parquetTableMetadata,
+ rowGroup.getColumns(),
+ implicitColValues);
- Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
+ Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
- if (filterPredicate == null) {
- ErrorCollector errorCollector = new ErrorCollectorImpl();
- LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
- filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
-
- if (errorCollector.hasErrors()) {
- logger.error("{} error(s) encountered when materialize filter expression : {}",
- errorCollector.getErrorCount(), errorCollector.toErrorString());
- return null;
- }
- // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
+ if (filterPredicate == null) {
+ ErrorCollector errorCollector = new ErrorCollectorImpl();
+ LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
+ filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
- Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
- filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
- materializedFilter, constantBoundaries, udfUtilities);
-
- if (filterPredicate == null) {
- return null;
- }
+ if (errorCollector.hasErrors()) {
+ logger.error("{} error(s) encountered when materialize filter expression : {}",
+ errorCollector.getErrorCount(), errorCollector.toErrorString());
+ return null;
}
+ // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
- if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
- continue;
+ Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+ filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
+ materializedFilter, constantBoundaries, udfUtilities);
+
+ if (filterPredicate == null) {
+ return null;
}
+ }
- qualifiedRGs.add(rowGroup);
- qualifiedFileNames.add(file.getPath()); // TODO : optimize when 1 file contains m row groups.
+ if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
+ continue;
}
+
+ qualifiedRGs.add(rowGroup);
+ qualifiedFileNames.add(rowGroup.getPath()); // TODO : optimize when 1 file contains m row groups.
}
- if (qualifiedFileNames.size() == fileSet.size() ) {
+
+ if (qualifiedRGs.size() == rowGroupInfos.size() ) {
// There is no reduction of rowGroups. Return the original groupScan.
logger.debug("applyFilter does not have any pruning!");
return null;
} else if (qualifiedFileNames.size() == 0) {
logger.warn("All rowgroups have been filtered out. Add back one to get schema from scannner");
- qualifiedFileNames.add(fileSet.iterator().next());
+ RowGroupInfo rg = rowGroupInfos.iterator().next();
+ qualifiedFileNames.add(rg.getPath());
+ qualifiedRGs.add(rg);
}
try {
FileSelection newSelection = new FileSelection(null, Lists.newArrayList(qualifiedFileNames), getSelectionRoot(), cacheFileRoot, false);
- logger.info("applyFilter {} reduce parquet file # from {} to {}", ExpressionStringBuilder.toString(filterExpr), fileSet.size(), qualifiedFileNames.size());
- return this.clone(newSelection);
+ logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
+ ParquetGroupScan clonegroupscan = this.clone(newSelection);
+ clonegroupscan.rowGroupInfos = qualifiedRGs;
+ clonegroupscan.updatePartitionColTypeMap();
+ return clonegroupscan;
+
} catch (IOException e) {
logger.warn("Could not apply filter prune due to Exception : {}", e);
return null;
http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index fa5c8b2..8f56c45 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -384,6 +384,16 @@ public class TestParquetFilterPushDown extends PlanTestBase {
}
+ @Test
+ public void testMultiRowGroup() throws Exception {
+ // multirowgroup is a parquet file with 2 rowgroups inside. One with a = 1 and the other with a = 2;
+ // FilterPushDown should be able to remove the rowgroup with a = 1 from the scan operator.
+ final String sql = String.format("select * from dfs_test.`%s/parquet/multirowgroup.parquet` where a > 1", TEST_RES_PATH);
+ final String[] expectedPlan = {"numRowGroups=1"};
+ final String[] excludedPlan = {};
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+ }
+
//////////////////////////////////////////////////////////////////////////////////////////////////
// Some test helper functions.
//////////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/drill/blob/3036d370/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet
new file mode 100644
index 0000000..1cb5551
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/multirowgroup.parquet differ