You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/01/30 05:38:31 UTC
[1/4] drill git commit: DRILL-4279: Improve performance for skipAll
query against Text/JSON/Parquet table.
Repository: drill
Updated Branches:
refs/heads/master 03197d0f2 -> 9ff947288
DRILL-4279: Improve performance for skipAll query against Text/JSON/Parquet table.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ff94728
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ff94728
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ff94728
Branch: refs/heads/master
Commit: 9ff947288f3214fe8e525e001d89a4f91b8b0728
Parents: 9b4008d
Author: Jinfeng Ni <jn...@apache.org>
Authored: Tue Jan 19 16:47:08 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../apache/drill/exec/store/hive/HiveScan.java | 3 +-
.../drill/exec/TestHiveProjectPushDown.java | 8 +
.../drill/exec/planner/logical/ColumnList.java | 200 -------------------
.../exec/planner/logical/DrillScanRel.java | 9 +-
.../drill/exec/store/AbstractRecordReader.java | 38 +++-
.../exec/store/easy/json/JSONRecordReader.java | 6 +
.../compliant/CompliantTextRecordReader.java | 6 +-
.../columnreaders/ParquetRecordReader.java | 11 +
.../exec/store/text/DrillTextRecordReader.java | 5 +
.../org/apache/drill/TestProjectPushDown.java | 55 +++++
.../drill/exec/store/avro/AvroFormatTest.java | 9 +
11 files changed, 132 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 504d755..1a58cbd 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -240,7 +240,8 @@ public class HiveScan extends AbstractGroupScan {
Table hiveTable = hiveReadEntry.getTable();
projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
} else {
- projectedColumnCount = columns.size();
+ // In cost estimation, # of project columns should be >= 1, even for skipAll query.
+ projectedColumnCount = Math.max(columns.size(), 1);
}
return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 5f559ea..ce99961 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -98,6 +98,14 @@ public class TestHiveProjectPushDown extends HiveTestBase {
}
@Test
+ public void testHiveCountStar() throws Exception {
+ String query = "SELECT count(*) as cnt FROM hive.`default`.kv";
+ String expectedColNames = "\"columns\" : [ ]";
+
+ testHelper(query, 1, expectedColNames);
+ }
+
+ @Test
public void projectPushDownOnHiveParquetTable() throws Exception {
try {
test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ColumnList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ColumnList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ColumnList.java
deleted file mode 100644
index e448bc5..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ColumnList.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
-
-/**
- * A list decorator that wraps a list of columns along with a {@link org.apache.drill.exec.planner.logical.ColumnList.Mode mode}
- * that informs {@link org.apache.drill.exec.store.AbstractRecordReader readers} to scan or skip the underlying list of columns.
- */
-public class ColumnList implements List<SchemaPath> {
-
- public static enum Mode {
- SKIP_ALL,
- SCAN_ALL,
- SCAN_SOME
- }
-
- private final List<SchemaPath> backend;
- private final Mode mode;
-
- protected ColumnList(List<SchemaPath> backend, Mode mode) {
- this.backend = Preconditions.checkNotNull(backend);
- this.mode = Preconditions.checkNotNull(mode);
- }
-
- public Mode getMode() {
- return mode;
- }
-
- public static ColumnList none() {
- return new ColumnList(GroupScan.ALL_COLUMNS, Mode.SKIP_ALL);
- }
-
- public static ColumnList all() {
- return new ColumnList(GroupScan.ALL_COLUMNS, Mode.SCAN_ALL);
- }
-
- public static ColumnList some(List<SchemaPath> columns) {
- return new ColumnList(columns, Mode.SCAN_SOME);
- }
-
-
- @Override
- public int size() {
- return backend.size();
- }
-
- @Override
- public boolean isEmpty() {
- return backend.isEmpty();
- }
-
- @Override
- public boolean contains(Object o) {
- return backend.contains(o);
- }
-
- @Override
- public Iterator<SchemaPath> iterator() {
- return backend.iterator();
- }
-
- @Override
- public Object[] toArray() {
- return backend.toArray();
- }
-
- @Override
- public <T> T[] toArray(T[] a) {
- return backend.toArray(a);
- }
-
- @Override
- public boolean add(SchemaPath path) {
- return backend.add(path);
- }
-
- @Override
- public boolean remove(Object o) {
- return backend.remove(o);
- }
-
- @Override
- public boolean containsAll(Collection<?> c) {
- return backend.containsAll(c);
- }
-
- @Override
- public boolean addAll(Collection<? extends SchemaPath> c) {
- return backend.addAll(c);
- }
-
- @Override
- public boolean addAll(int index, Collection<? extends SchemaPath> c) {
- return backend.addAll(index, c);
- }
-
- @Override
- public boolean removeAll(Collection<?> c) {
- return backend.removeAll(c);
- }
-
- @Override
- public boolean retainAll(Collection<?> c) {
- return backend.retainAll(c);
- }
-
- @Override
- public void clear() {
- backend.clear();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof ColumnList) {
- final ColumnList other = ColumnList.class.cast(o);
- return backend.equals(other) && Objects.equal(mode, other.mode);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(backend, mode);
- }
-
- @Override
- public SchemaPath get(int index) {
- return backend.get(index);
- }
-
- @Override
- public SchemaPath set(int index, SchemaPath element) {
- return backend.set(index, element);
- }
-
- @Override
- public void add(int index, SchemaPath element) {
- backend.add(index, element);
- }
-
- @Override
- public SchemaPath remove(int index) {
- return backend.remove(index);
- }
-
- @Override
- public int indexOf(Object o) {
- return backend.indexOf(o);
- }
-
- @Override
- public int lastIndexOf(Object o) {
- return backend.lastIndexOf(o);
- }
-
- @Override
- public ListIterator<SchemaPath> listIterator() {
- return backend.listIterator();
- }
-
- @Override
- public ListIterator<SchemaPath> listIterator(int index) {
- return backend.listIterator(index);
- }
-
- @Override
- public List<SchemaPath> subList(int fromIndex, int toIndex) {
- return backend.subList(fromIndex, toIndex);
- }
-
- @Override
- public String toString() {
- return backend.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 0d087aa..94322d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -82,13 +82,8 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
super(DRILL_LOGICAL, cluster, traits, table);
this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
this.rowType = rowType;
- if (columns == null) { // planner asks to scan all of the columns
- this.columns = ColumnList.all();
- } else if (columns.size() == 0) { // planner asks to skip all of the columns
- this.columns = ColumnList.none();
- } else { // planner asks to scan some columns
- this.columns = ColumnList.some(columns);
- }
+ Preconditions.checkNotNull(columns);
+ this.columns = columns;
this.partitionFilterPushdown = partitionFilterPushdown;
try {
this.groupScan = drillTable.getGroupScan().clone(this.columns);
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 2f1ca56..ca44d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -18,11 +18,14 @@
package org.apache.drill.exec.store;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.planner.logical.ColumnList;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.vector.ValueVector;
@@ -31,10 +34,14 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
public abstract class AbstractRecordReader implements RecordReader {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);
+
private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
- private static final String COL_EMPTY_ERROR = "Readers needs at least a column to read.";
public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+ // For text reader, the default columns to read is "columns[0]".
+ protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));
+
private Collection<SchemaPath> columns = null;
private boolean isStarQuery = false;
private boolean isSkipQuery = false;
@@ -47,14 +54,22 @@ public abstract class AbstractRecordReader implements RecordReader {
+ ", isSkipQuery = " + isSkipQuery + "]";
}
- protected final void setColumns(Collection<SchemaPath> projected) {
- assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR;
- if (projected instanceof ColumnList) {
- final ColumnList columns = ColumnList.class.cast(projected);
- isSkipQuery = columns.getMode() == ColumnList.Mode.SKIP_ALL;
+ protected final void setColumns(List<SchemaPath> projected) {
+ Preconditions.checkNotNull(projected, COL_NULL_ERROR);
+ isSkipQuery = projected.isEmpty();
+ List<SchemaPath> columnsToRead = projected;
+
+ // If no column is required (SkipQuery), by default it will use DEFAULT_COLS_TO_READ .
+ // Handling SkipQuery is storage-plugin specif : JSON, text reader, parquet will override, in order to
+ // improve query performance.
+ if (projected.isEmpty()) {
+ columnsToRead = getDefaultColumnsToRead();
}
- isStarQuery = isStarQuery(projected);
- columns = transformColumns(projected);
+
+ isStarQuery = isStarQuery(columnsToRead);
+ columns = transformColumns(columnsToRead);
+
+ logger.debug("columns to read : {}", columns);
}
protected Collection<SchemaPath> getColumns() {
@@ -92,4 +107,9 @@ public abstract class AbstractRecordReader implements RecordReader {
v.allocateNew();
}
}
+
+ protected List<SchemaPath> getDefaultColumnsToRead() {
+ return GroupScan.ALL_COLUMNS;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 1d22fed..8d82f78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
@@ -28,6 +29,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -140,6 +142,10 @@ public class JSONRecordReader extends AbstractRecordReader {
}
}
+ protected List<SchemaPath> getDefaultColumnsToRead() {
+ return ImmutableList.of();
+ }
+
private void setupParser() throws IOException {
if(hadoopPath != null){
jsonReader.setSource(stream);
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index f6dab89..cf2359f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -28,7 +28,6 @@ import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
@@ -90,6 +89,11 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
return super.isStarQuery();
}
+ @Override
+ protected List<SchemaPath> getDefaultColumnsToRead() {
+ return DEFAULT_TEXT_COLS_TO_READ;
+ }
+
/**
* Performs the initial setup required for the record reader.
* Initializes the input stream, handling of the output record batch
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 7131b6c..61e05db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -69,6 +70,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+ // When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist,
+ // it will return as a nullable-int column. If that column happens to exist, return that column.
+ protected static final List<SchemaPath> DEFAULT_COLS_TO_READ = ImmutableList.of(SchemaPath.getSimplePath("_DEFAULT_COL_TO_READ_"));
+
// TODO - should probably find a smarter way to set this, currently 1 megabyte
public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
@@ -508,4 +513,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
parquetReaderStats=null;
}
}
+
+ @Override
+ protected List<SchemaPath> getDefaultColumnsToRead() {
+ return DEFAULT_COLS_TO_READ;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index bc675af..d97fc58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -107,6 +107,11 @@ public class DrillTextRecordReader extends AbstractRecordReader {
}
@Override
+ protected List<SchemaPath> getDefaultColumnsToRead() {
+ return DEFAULT_TEXT_COLS_TO_READ;
+ }
+
+ @Override
public boolean isStarQuery() {
return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
private final SchemaPath COLUMNS = SchemaPath.getSimplePath("columns");
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
index 60a6f49..d926422 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -209,6 +209,61 @@ public class TestProjectPushDown extends PlanTestBase {
}
@Test
+ public void testEmptyColProjectInTextScan() throws Exception {
+ final String sql = "SELECT count(*) cnt from cp.`store/text/data/d1/regions.csv`";
+ final String expected = "\"columns\" : [ ]";
+ // Verify plan
+ testPushDown(new PushDownTestInstance(sql, new String[] {expected}));
+
+ // Verify execution result.
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues((long) 5)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testEmptyColProjectInJsonScan() throws Exception {
+ final String sql = "SELECT count(*) cnt from cp.`employee.json`";
+ final String expected = "\"columns\" : [ ]";
+
+ testPushDown(new PushDownTestInstance(sql, new String[] {expected}));
+
+ // Verify execution result.
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues((long) 1155)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testEmptyColProjectInParquetScan() throws Exception {
+ final String sql = "SELECT 1 + 1 as val from cp.`tpch/region.parquet`";
+ final String expected = "\"columns\" : [ ]";
+
+ testPushDown(new PushDownTestInstance(sql, new String[] {expected}));
+
+ // Verify execution result.
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("val")
+ .baselineValues(2)
+ .baselineValues(2)
+ .baselineValues(2)
+ .baselineValues(2)
+ .baselineValues(2)
+ .build()
+ .run();
+ }
+
+ @Test
public void testSimpleProjectPastJoinPastFilterPastJoinPushDown() throws Exception {
// String sql = "select * " +
// "from cp.`%s` t0, cp.`%s` t1, cp.`%s` t2 " +
http://git-wip-us.apache.org/repos/asf/drill/blob/9ff94728/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index d718342..ecf8369 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -262,4 +262,13 @@ public class AvroFormatTest extends BaseTestQuery {
final String sql = "select * from dfs_test.`" + file + "`";
test(sql);
}
+
+ @Test
+ public void testCountStar() throws Exception {
+
+ final String file = AvroTestUtil.generateStringAndUtf8Data();
+ final String sql = "select count(*) from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
}
[4/4] drill git commit: DRILL-2517: (Prototype from Adam) Apply
directory-based pruning before reading file.
Posted by jn...@apache.org.
DRILL-2517: (Prototype from Adam) Apply directory-based pruning before reading file.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/09de31ed
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/09de31ed
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/09de31ed
Branch: refs/heads/master
Commit: 09de31edfa74e09a9a79b9a0fc995b33d70f3935
Parents: 03197d0
Author: AdamPD <ad...@pharmadata.net.au>
Authored: Mon Jan 18 10:40:54 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../drill/exec/planner/logical/DrillRuleSets.java | 2 +-
.../drill/exec/planner/logical/DrillTable.java | 4 ++++
.../planner/logical/partition/PruneScanRule.java | 18 ++++++++++++------
3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/09de31ed/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index b7974f3..6f1f995 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -141,10 +141,10 @@ public class DrillRuleSets {
*/
DrillPushProjectPastFilterRule.INSTANCE,
DrillPushProjectPastJoinRule.INSTANCE,
- DrillPushProjIntoScan.INSTANCE,
// Due to infinite loop in planning (DRILL-3257), temporarily disable this rule
//DrillProjectSetOpTransposeRule.INSTANCE,
ProjectWindowTransposeRule.INSTANCE,
+ DrillPushProjIntoScan.INSTANCE,
/*
Convert from Calcite Logical to Drill Logical Rules.
http://git-wip-us.apache.org/repos/asf/drill/blob/09de31ed/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 61f242f..106290d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -85,6 +85,10 @@ public abstract class DrillTable implements Table {
return selection;
}
+ public void modifySelection(Object selection) {
+ this.selection = selection;
+ }
+
public String getStorageEngineName() {
return storageEngineName;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/09de31ed/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index ab6c5a8..4cc9c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -24,6 +24,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.BitSets;
@@ -50,6 +53,7 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
@@ -80,7 +84,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+ RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
"PruneScanRule:Filter_On_Project",
optimizerRulesContext) {
@@ -103,9 +107,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ final FilterRel filterRel = (DrillFilterRel) call.rel(0);
+ final ProjectRel projectRel = (DrillProjectRel) call.rel(1);
+ final ScanRel scanRel = (DrillScanRel) call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
@@ -113,7 +117,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+ RelOptHelper.some(Filter.class, RelOptHelper.any(EnumerableTableScan.class)),
"PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
@Override
@@ -142,7 +146,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
};
}
- protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, DrillScanRel scanRel) {
+ final DrillTable table = scanRel.getTable().unwrap(DrillTable.class);
+
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = new Stopwatch();
[3/4] drill git commit: DRILL-2517: Move directory-based partition
pruning to Calcite logical planning phase.
Posted by jn...@apache.org.
DRILL-2517: Move directory-based partition pruning to Calcite logical planning phase.
1) Make directory-based pruning rule both work in calcite logical and drill logical planning phase.
2) Only apply directory-based pruning in logical phase when there is no metadata cache.
3) Make FileSelection constructor public, since FileSelection.create() would modify selectionRoot.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9b4008dc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9b4008dc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9b4008dc
Branch: refs/heads/master
Commit: 9b4008dc384bc5e0b9af9a048bfc93c5bcb902b2
Parents: c7dfba2
Author: Jinfeng Ni <jn...@apache.org>
Authored: Fri Jan 8 10:28:53 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../planner/sql/HivePartitionDescriptor.java | 56 +--
.../HivePushPartitionFilterIntoScan.java | 15 +-
.../planner/FileSystemPartitionDescriptor.java | 81 ++++-
.../planner/ParquetPartitionDescriptor.java | 17 +-
.../drill/exec/planner/PartitionDescriptor.java | 12 +-
.../planner/logical/DrillPushProjIntoScan.java | 8 +-
.../exec/planner/logical/DrillRuleSets.java | 21 +-
.../logical/partition/ParquetPruneScanRule.java | 19 +-
.../logical/partition/PruneScanRule.java | 346 ++++---------------
.../planner/sql/handlers/DefaultSqlHandler.java | 9 +
.../drill/exec/store/dfs/FileSelection.java | 6 +-
.../drill/exec/store/dfs/FormatSelection.java | 4 +
.../store/parquet/ParquetFileSelection.java | 4 +
.../org/apache/drill/TestExampleQueries.java | 4 -
.../org/apache/drill/TestPartitionFilter.java | 16 +
.../1995/Q1/orders_95_q1.parquet | Bin 0 -> 2180 bytes
.../1996/Q1/badFormat.parquet | 1 +
17 files changed, 281 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index e1eb25e..e531f38 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql;
import io.netty.buffer.DrillBuf;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.hive.HiveUtilities;
@@ -90,27 +92,6 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
- HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
- HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
- List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
- List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
-
- for (HiveTable.HivePartition part: oldPartitions) {
- String partitionLocation = part.getPartition().getSd().getLocation();
- for (String newPartitionLocation: newFiles) {
- if (partitionLocation.equals(newPartitionLocation)) {
- newPartitions.add(part);
- }
- }
- }
-
- HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
-
- return hiveScan.clone(newReadEntry);
- }
-
- @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
int record = 0;
@@ -169,4 +150,37 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newPartitions) throws Exception {
+ GroupScan newGroupScan = createNewGroupScan(newPartitions);
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ scanRel.getColumns(),
+ true /*filter pushdown*/);
+ }
+
+ private GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
+ HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+ HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
+ List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
+ List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+
+ for (HiveTable.HivePartition part: oldPartitions) {
+ String partitionLocation = part.getPartition().getSd().getLocation();
+ for (String newPartitionLocation: newFiles) {
+ if (partitionLocation.equals(newPartitionLocation)) {
+ newPartitions.add(part);
+ }
+ }
+ }
+
+ HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
+
+ return hiveScan.clone(newReadEntry);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 17cd65a..0bdfe99 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -19,6 +19,7 @@
package org.apache.drill.exec.planner.sql.logical;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -44,7 +45,7 @@ public abstract class HivePushPartitionFilterIntoScan {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -63,9 +64,9 @@ public abstract class HivePushPartitionFilterIntoScan {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillProjectRel projectRel = call.rel(1);
+ final DrillScanRel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
@@ -78,7 +79,7 @@ public abstract class HivePushPartitionFilterIntoScan {
"HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -97,8 +98,8 @@ public abstract class HivePushPartitionFilterIntoScan {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillScanRel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 02658ed..04a3f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -24,11 +24,14 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
@@ -36,7 +39,10 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -53,14 +59,22 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
private final String partitionLabel;
private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap();
- private final EnumerableTableScan scanRel;
- private final DynamicDrillTable table;
+ private final TableScan scanRel;
+ private final DrillTable table;
- public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public FileSystemPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ Preconditions.checkArgument(scanRel instanceof DrillScanRel || scanRel instanceof EnumerableTableScan);
this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length();
- this.scanRel = (EnumerableTableScan) scanRel;
- table = scanRel.getTable().unwrap(DynamicDrillTable.class);
+ this.scanRel = scanRel;
+ DrillTable unwrap;
+ unwrap = scanRel.getTable().unwrap(DrillTable.class);
+ if (unwrap == null) {
+ unwrap = scanRel.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+
+ table = unwrap;
+
for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i);
}
@@ -87,18 +101,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
return MAX_NESTED_SUBDIRS;
}
- @Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
- /*
- THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
- final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
- final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
- return newScan;
- */
- return null;
- }
-
- public DynamicDrillTable getTable() {
+// @Override
+// public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+// if (scanRel instanceof DrillScanRel) {
+// final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+// final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+// return newScan;
+// } else {
+// throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan");
+// }
+// }
+
+ public DrillTable getTable() {
return table;
}
@@ -152,4 +166,37 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newFiles) throws Exception {
+ if (scanRel instanceof DrillScanRel) {
+ final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+ final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ ((DrillScanRel) scanRel).getColumns(),
+ true /*filter pushdown*/);
+ } else if (scanRel instanceof EnumerableTableScan) {
+ return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles);
+ } else {
+ throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
+ }
+ }
+
+ private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles) {
+ final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
+ final FormatSelection formatSelection = (FormatSelection) table.getSelection();
+ final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+ final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+ final DrillTranslatableTable newTable = new DrillTranslatableTable(
+ new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+ table.getUserName(),
+ newFormatSelection));
+ final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+ return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index cda5a5e..81bcf03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -17,11 +17,13 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -78,8 +80,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
return partitionColumns.size();
}
- @Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+ private GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan;
@@ -128,4 +129,16 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newFiles) throws Exception {
+ final GroupScan newGroupScan = createNewGroupScan(newFiles);
+
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ scanRel.getColumns(),
+ true /*filter pushdown*/);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 726d8bc..dd3b084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -53,8 +54,6 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
// Maximum level of partition nesting/ hierarchy supported
public int getMaxHierarchyLevel();
- public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;
-
/**
* Method creates an in memory representation of all the partitions. For each level of partitioning we
* will create a value vector which this method will populate for all the partitions with the values of the
@@ -74,4 +73,13 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
* @return
*/
TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings);
+
+ /**
+ * Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
+ * @param newPartitions
+ * @return
+ * @throws Exception
+ */
+ public TableScan createTableScan(List<String> newPartitions) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 1fd1cd7..33c840b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -54,8 +54,14 @@ public class DrillPushProjIntoScan extends RelOptRule {
try {
ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects());
+ // get DrillTable, either wrapped in RelOptTable, or DrillTranslatableTable.
+ DrillTable table = scan.getTable().unwrap(DrillTable.class);
+ if (table == null) {
+ table = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+
if (columnInfo == null || columnInfo.isStarQuery() //
- || !scan.getTable().unwrap(DrillTable.class) //
+ || !table //
.getGroupScan().canPushdownProjects(columnInfo.columns)) {
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 6f1f995..d9609d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -204,8 +204,8 @@ public class DrillRuleSets {
public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add(
- PruneScanRule.getFilterOnProject(optimizerRulesContext),
- PruneScanRule.getFilterOnScan(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
)
@@ -214,6 +214,23 @@ public class DrillRuleSets {
return new DrillRuleSet(pruneRules);
}
+ /**
+ * Get an immutable list of directory-based partition pruing rules that will be used in Calcite logical planning.
+ * @param optimizerRulesContext
+ * @return
+ */
+ public static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
+ final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
+ .add(
+ PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
+ )
+ .build();
+
+ return new DrillRuleSet(pruneRules);
+
+ }
+
// Ruleset for join permutation, used only in VolcanoPlanner.
public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) {
return new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( //
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index d92d2b0..f5dbb9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.logical.partition;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -42,13 +43,13 @@ public class ParquetPruneScanRule {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(2);
+ final DrillScanRel scan = call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -60,9 +61,9 @@ public class ParquetPruneScanRule {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillProjectRel projectRel = call.rel(1);
+ final DrillScanRel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
@@ -74,13 +75,13 @@ public class ParquetPruneScanRule {
"PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ final DrillScanRel scan = call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -92,8 +93,8 @@ public class ParquetPruneScanRule {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillScanRel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index bfd6597..aefd247 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,8 +25,12 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.jdbc.CalciteAbstractSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.BitSets;
@@ -55,6 +59,8 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -64,6 +70,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetFileSelection;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -87,282 +94,64 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
this.optimizerContext = optimizerContext;
}
- public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
- return new PruneScanRule(
- RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
- "PruneScanRule:Filter_On_Project",
- optimizerRulesContext) {
-
- @Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
- return new FileSystemPartitionDescriptor(settings, scanRel);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- /*
- final DrillScanRel scan = (DrillScanRel) call.rel(2);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- */
- return true;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final LogicalFilter filterRel = call.rel(0);
- final Project projectRel = call.rel(1);
- final EnumerableTableScan scanRel = call.rel(2);
- doOnMatchLogical(call, filterRel, projectRel, scanRel);
- }
- };
- }
-
- public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
- return new PruneScanRule(
- RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
- "PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
-
- @Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
- return new FileSystemPartitionDescriptor(settings, scanRel);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- */
- return true;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final LogicalFilter filterRel = call.rel(0);
- final EnumerableTableScan scanRel = call.rel(1);
- doOnMatchLogical(call, filterRel, null, scanRel);
- }
- };
- }
-
- // TODO: Combine the doOnMatch and doOnMatchLogical
- protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
- final String pruningClassName = getClass().getName();
- logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
- Stopwatch totalPruningTime = new Stopwatch();
- totalPruningTime.start();
-
-
- final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
- PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
- final BufferAllocator allocator = optimizerContext.getAllocator();
-
-
- RexNode condition = null;
- if (projectRel == null) {
- condition = filterRel.getCondition();
- } else {
- // get the filter as if it were below the projection.
- condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+ private static class DirPruneScanFilterOnProjectRule extends PruneScanRule {
+ public DirPruneScanFilterOnProjectRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(TableScan.class))), "DirPruneScanRule:Filter_On_Project", optimizerRulesContext);
}
- RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
- condition = condition.accept(visitor);
-
- Map<Integer, String> fieldNameMap = Maps.newHashMap();
- List<String> fieldNames = scanRel.getRowType().getFieldNames();
- BitSet columnBitset = new BitSet();
- BitSet partitionColumnBitSet = new BitSet();
-
- int relColIndex = 0;
- for (String field : fieldNames) {
- final Integer partitionIndex = descriptor.getIdIfValid(field);
- if (partitionIndex != null) {
- fieldNameMap.put(partitionIndex, field);
- partitionColumnBitSet.set(partitionIndex);
- columnBitset.set(relColIndex);
- }
- relColIndex++;
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
}
- if (partitionColumnBitSet.isEmpty()) {
- logger.info("No partition columns are projected from the scan..continue. " +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final TableScan scan = call.rel(2);
+ return isQualifiedDirPruning(scan);
}
- // stop watch to track how long we spend in different phases of pruning
- Stopwatch miscTimer = new Stopwatch();
-
- // track how long we spend building the filter tree
- miscTimer.start();
-
- FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
- c.analyze(condition);
- RexNode pruneCondition = c.getFinalCondition();
-
- logger.info("Total elapsed time to build and analyze filter tree: {} ms",
- miscTimer.elapsed(TimeUnit.MILLISECONDS));
- miscTimer.reset();
-
- if (pruneCondition == null) {
- logger.info("No conditions were found eligible for partition pruning." +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Filter filterRel = call.rel(0);
+ final Project projectRel = call.rel(1);
+ final TableScan scanRel = call.rel(2);
+ doOnMatch(call, filterRel, projectRel, scanRel);
}
+ }
- // set up the partitions
- List<String> newFiles = Lists.newArrayList();
- long numTotal = 0; // total number of partitions
- int batchIndex = 0;
- String firstLocation = null;
- LogicalExpression materializedExpr = null;
-
- // Outer loop: iterate over a list of batches of PartitionLocations
- for (List<PartitionLocation> partitions : descriptor) {
- numTotal += partitions.size();
- logger.debug("Evaluating partition pruning for batch {}", batchIndex);
- if (batchIndex == 0) { // save the first location in case everything is pruned
- firstLocation = partitions.get(0).getEntirePartitionLocation();
- }
- final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
- final VectorContainer container = new VectorContainer();
-
- try {
- final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
- for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
- SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
- MajorType type = descriptor.getVectorType(column, settings);
- MaterializedField field = MaterializedField.create(column, type);
- ValueVector v = TypeHelper.getNewVector(field, allocator);
- v.allocateNew();
- vectors[partitionColumnIndex] = v;
- container.add(v);
- }
-
- // track how long we spend populating partition column vectors
- miscTimer.start();
-
- // populate partition vectors.
- descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
-
- logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
- miscTimer.reset();
-
- // materialize the expression; only need to do this once
- if (batchIndex == 0) {
- materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
- if (materializedExpr == null) {
- // continue without partition pruning; no need to log anything here since
- // materializePruneExpr logs it already
- logger.info("Total pruning elapsed time: {} ms",
- totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
- }
- }
-
- output.allocateNew(partitions.size());
-
- // start the timer to evaluate how long we spend in the interpreter evaluation
- miscTimer.start();
-
- InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
-
- logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
- miscTimer.reset();
-
- int recordCount = 0;
- int qualifiedCount = 0;
-
- // Inner loop: within each batch iterate over the PartitionLocations
- for(PartitionLocation part: partitions){
- if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
- newFiles.add(part.getEntirePartitionLocation());
- qualifiedCount++;
- }
- recordCount++;
- }
- logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
- batchIndex++;
- } catch (Exception e) {
- logger.warn("Exception while trying to prune partition.", e);
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return; // continue without partition pruning
- } finally {
- container.clear();
- if (output != null) {
- output.clear();
- }
- }
+ private static class DirPruneScanFilterOnScanRule extends PruneScanRule {
+ public DirPruneScanFilterOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Filter.class, RelOptHelper.any(TableScan.class)), "DirPruneScanRule:Filter_On_Scan", optimizerRulesContext);
}
- try {
-
- boolean canDropFilter = true;
-
- if (newFiles.isEmpty()) {
- assert firstLocation != null;
- newFiles.add(firstLocation);
- canDropFilter = false;
- }
-
- if (newFiles.size() == numTotal) {
- logger.info("No partitions were eligible for pruning");
- return;
- }
-
- logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
-
- List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
- List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
- conjuncts.removeAll(pruneConjuncts);
- RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
-
- RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
-
- condition = condition.accept(reverseVisitor);
- pruneCondition = pruneCondition.accept(reverseVisitor);
-
- RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
- DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
- FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
- FileSelection oldFileSelection = formatSelection.getSelection();
- FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
- FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
- DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
- oldTable.getUserName(), newFormatSelection);
- RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
-
- // TODO: The new scan should come from the PartitionDescriptor
- // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan
- EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
+ }
- RelNode inputRel = newScan;
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final TableScan scan = call.rel(1);
+ return isQualifiedDirPruning(scan);
+ }
- if (projectRel != null) {
- inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
- }
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Filter filterRel = call.rel(0);
+ final TableScan scanRel = call.rel(1);
+ doOnMatch(call, filterRel, null, scanRel);
+ }
+ }
- if (newCondition.isAlwaysTrue() && canDropFilter) {
- call.transformTo(inputRel);
- } else {
- final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
- call.transformTo(newFilter);
- }
+ public static final RelOptRule getDirFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
+ return new DirPruneScanFilterOnProjectRule(optimizerRulesContext);
+ }
- } catch (Exception e) {
- logger.warn("Exception while using the pruned partitions.", e);
- } finally {
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- }
+ public static final RelOptRule getDirFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
+ return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
}
- protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = new Stopwatch();
@@ -373,7 +162,6 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
final BufferAllocator allocator = optimizerContext.getAllocator();
-
RexNode condition = null;
if (projectRel == null) {
condition = filterRel.getCondition();
@@ -541,16 +329,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);
- final DrillScanRel newScanRel =
- new DrillScanRel(scanRel.getCluster(),
- scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
- scanRel.getTable(),
- descriptor.createNewGroupScan(newFiles),
- scanRel.getRowType(),
- scanRel.getColumns(),
- true /*filter pushdown*/);
-
- RelNode inputRel = newScanRel;
+ RelNode inputRel = descriptor.createTableScan(newFiles);
if (projectRel != null) {
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -602,5 +381,28 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return optimizerContext;
}
- public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
+ public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);
+
+ private static boolean isQualifiedDirPruning(final TableScan scan) {
+ if (scan instanceof EnumerableTableScan) {
+ DrillTable drillTable;
+ drillTable = scan.getTable().unwrap(DrillTable.class);
+ if (drillTable == null) {
+ drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+ final Object selection = drillTable.getSelection();
+ if (selection instanceof FormatSelection
+ && ((FormatSelection)selection).supportDirPruning()) {
+ return true; // Do directory-based pruning in Calcite logical
+ } else {
+ return false; // Do not do directory-based pruning in Calcite logical
+ }
+ } else if (scan instanceof DrillScanRel) {
+ final GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+ // this rule is applicable only for dfs based partition pruning in Drill Logical
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !((DrillScanRel)scan).partitionFilterPushdown();
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 686f7d7..d6bdc78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -534,6 +534,15 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
private RelNode doLogicalPlanning(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+ // 1. Call HepPlanner with directory-based partition pruning, in Calcite logical rel
+ // Partition pruning .
+ ImmutableSet<RelOptRule> dirPruneScanRules = ImmutableSet.<RelOptRule>builder()
+ .addAll(DrillRuleSets.getDirPruneScanRules(context))
+ .build();
+
+ relNode = doHepPlan(relNode, dirPruneScanRules, HepMatchOrder.BOTTOM_UP);
+ log("Post-Dir-Pruning", relNode, logger);
+
if (! context.getPlannerSettings().isHepOptEnabled()) {
return planner.transform(DrillSqlWorker.LOGICAL_RULES, relNode.getTraitSet().plus(DrillRel.DRILL_LOGICAL), relNode);
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 0d49b5c..bc3cef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -52,7 +52,7 @@ public class FileSelection {
* @param files list of files
* @param selectionRoot root path for selections
*/
- protected FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+ public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
this.statuses = statuses;
this.files = files;
this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
@@ -246,4 +246,8 @@ public class FileSelection {
return statuses;
}
+ public boolean supportDirPrunig() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 4473c5c..f802fb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -61,4 +61,8 @@ public class FormatSelection {
return selection;
}
+ @JsonIgnore
+ public boolean supportDirPruning() {
+ return selection.supportDirPrunig();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
index 33dccd6..93201bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
@@ -59,4 +59,8 @@ public class ParquetFileSelection extends FileSelection {
return new ParquetFileSelection(selection, metadata);
}
+ @Override
+ public boolean supportDirPrunig() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0ca6bb9..97df2ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1195,8 +1195,4 @@ public class TestExampleQueries extends BaseTestQuery {
.run();
}
- @Test
- public void t() throws Exception {
- test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index ba70788..e5d6603 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -345,4 +345,20 @@ public class TestPartitionFilter extends PlanTestBase {
testIncludeFilter(query, 1, "Filter", 1);
}
+ @Test
+ public void testLogicalDirPruning() throws Exception {
+ // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+ // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+ String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995", TEST_RES_PATH);
+ testExcludeFilter(query, 1, "Filter", 10);
+ }
+
+ @Test
+ public void testLogicalDirPruning2() throws Exception {
+ // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+ // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+ String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995 and o_custkey > 0", TEST_RES_PATH);
+ testIncludeFilter(query, 1, "Filter", 10);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
new file mode 100644
index 0000000..93514c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
new file mode 100644
index 0000000..f62d37d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
@@ -0,0 +1 @@
+BAD FORMAT!!!
[2/4] drill git commit: DRILL-2517: (Prototype from Mehant) Move
directory based partition pruning to logical phase.
Posted by jn...@apache.org.
DRILL-2517: (Prototype from Mehant) Move directory based partition pruning to logical phase.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7dfba2e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7dfba2e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7dfba2e
Branch: refs/heads/master
Commit: c7dfba2e62fc6b063e953bd9cb1fc5b2903edc79
Parents: 09de31e
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Nov 10 22:26:26 2015 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../HivePushPartitionFilterIntoScan.java | 9 +-
.../planner/FileSystemPartitionDescriptor.java | 27 +-
.../drill/exec/planner/logical/DrillTable.java | 8 +-
.../logical/partition/ParquetPruneScanRule.java | 9 +-
.../logical/partition/PruneScanRule.java | 268 +++++++++++++++++--
.../drill/exec/store/dfs/FileSelection.java | 4 +
.../org/apache/drill/TestExampleQueries.java | 7 +-
7 files changed, 285 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index fc2007e..17cd65a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.sql.logical;
+import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -43,8 +44,8 @@ public abstract class HivePushPartitionFilterIntoScan {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -77,8 +78,8 @@ public abstract class HivePushPartitionFilterIntoScan {
"HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index c4e4cb9..02658ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
@@ -35,6 +37,7 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -50,12 +53,14 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
private final String partitionLabel;
private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap();
- private final DrillScanRel scanRel;
+ private final EnumerableTableScan scanRel;
+ private final DynamicDrillTable table;
- public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length();
- this.scanRel = scanRel;
+ this.scanRel = (EnumerableTableScan) scanRel;
+ table = scanRel.getTable().unwrap(DynamicDrillTable.class);
for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i);
}
@@ -84,9 +89,17 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
- final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
- final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
+ /*
+ THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
+ final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
+ final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
return newScan;
+ */
+ return null;
+ }
+
+ public DynamicDrillTable getTable() {
+ return table;
}
@Override
@@ -124,13 +137,13 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
}
private String getBaseTableLocation() {
- final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
+ final FormatSelection origSelection = (FormatSelection) table.getSelection();
return origSelection.getSelection().selectionRoot;
}
@Override
protected void createPartitionSublists() {
- List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
+ List<String> fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
List<PartitionLocation> locations = new LinkedList<>();
for (String file: fileLocations) {
locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 106290d..1cb83b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -85,14 +85,14 @@ public abstract class DrillTable implements Table {
return selection;
}
- public void modifySelection(Object selection) {
- this.selection = selection;
- }
-
public String getStorageEngineName() {
return storageEngineName;
}
+ public String getUserName() {
+ return userName;
+ }
+
@Override
public Statistic getStatistic() {
return Statistics.UNKNOWN;
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index b4f4e95..d92d2b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.logical.partition;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -41,8 +42,8 @@ public class ParquetPruneScanRule {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new ParquetPartitionDescriptor(settings, scanRel);
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
@@ -73,8 +74,8 @@ public class ParquetPruneScanRule {
"PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new ParquetPartitionDescriptor(settings, scanRel);
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 4cc9c46..bfd6597 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,12 +25,14 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionFunction;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
@@ -53,13 +55,15 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -72,6 +76,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.exec.vector.ValueVector;
+
public abstract class PruneScanRule extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
@@ -84,71 +89,280 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
+ RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
"PruneScanRule:Filter_On_Project",
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
return new FileSystemPartitionDescriptor(settings, scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
+ /*
final DrillScanRel scan = (DrillScanRel) call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
- if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
- } else {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ */
+ return true;
}
@Override
public void onMatch(RelOptRuleCall call) {
- final FilterRel filterRel = (DrillFilterRel) call.rel(0);
- final ProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final ScanRel scanRel = (DrillScanRel) call.rel(2);
- doOnMatch(call, filterRel, projectRel, scanRel);
+ final LogicalFilter filterRel = call.rel(0);
+ final Project projectRel = call.rel(1);
+ final EnumerableTableScan scanRel = call.rel(2);
+ doOnMatchLogical(call, filterRel, projectRel, scanRel);
}
};
}
public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(Filter.class, RelOptHelper.any(EnumerableTableScan.class)),
+ RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
"PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
return new FileSystemPartitionDescriptor(settings, scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
- if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
- } else {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ */
+ return true;
}
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
- doOnMatch(call, filterRel, null, scanRel);
+ final LogicalFilter filterRel = call.rel(0);
+ final EnumerableTableScan scanRel = call.rel(1);
+ doOnMatchLogical(call, filterRel, null, scanRel);
}
};
}
- protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, DrillScanRel scanRel) {
- final DrillTable table = scanRel.getTable().unwrap(DrillTable.class);
+ // TODO: Combine the doOnMatch and doOnMatchLogical
+ protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
+ final String pruningClassName = getClass().getName();
+ logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = new Stopwatch();
+ totalPruningTime.start();
+
+
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
+ final BufferAllocator allocator = optimizerContext.getAllocator();
+
+
+ RexNode condition = null;
+ if (projectRel == null) {
+ condition = filterRel.getCondition();
+ } else {
+ // get the filter as if it were below the projection.
+ condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+ }
+
+ RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+ condition = condition.accept(visitor);
+
+ Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ List<String> fieldNames = scanRel.getRowType().getFieldNames();
+ BitSet columnBitset = new BitSet();
+ BitSet partitionColumnBitSet = new BitSet();
+
+ int relColIndex = 0;
+ for (String field : fieldNames) {
+ final Integer partitionIndex = descriptor.getIdIfValid(field);
+ if (partitionIndex != null) {
+ fieldNameMap.put(partitionIndex, field);
+ partitionColumnBitSet.set(partitionIndex);
+ columnBitset.set(relColIndex);
+ }
+ relColIndex++;
+ }
+
+ if (partitionColumnBitSet.isEmpty()) {
+ logger.info("No partition columns are projected from the scan..continue. " +
+ "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+
+ // stop watch to track how long we spend in different phases of pruning
+ Stopwatch miscTimer = new Stopwatch();
+
+ // track how long we spend building the filter tree
+ miscTimer.start();
+
+ FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
+ c.analyze(condition);
+ RexNode pruneCondition = c.getFinalCondition();
+
+ logger.info("Total elapsed time to build and analyze filter tree: {} ms",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS));
+ miscTimer.reset();
+
+ if (pruneCondition == null) {
+ logger.info("No conditions were found eligible for partition pruning." +
+ "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+
+ // set up the partitions
+ List<String> newFiles = Lists.newArrayList();
+ long numTotal = 0; // total number of partitions
+ int batchIndex = 0;
+ String firstLocation = null;
+ LogicalExpression materializedExpr = null;
+
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ for (List<PartitionLocation> partitions : descriptor) {
+ numTotal += partitions.size();
+ logger.debug("Evaluating partition pruning for batch {}", batchIndex);
+ if (batchIndex == 0) { // save the first location in case everything is pruned
+ firstLocation = partitions.get(0).getEntirePartitionLocation();
+ }
+ final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
+ final VectorContainer container = new VectorContainer();
+
+ try {
+ final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+ for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
+ SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ MajorType type = descriptor.getVectorType(column, settings);
+ MaterializedField field = MaterializedField.create(column, type);
+ ValueVector v = TypeHelper.getNewVector(field, allocator);
+ v.allocateNew();
+ vectors[partitionColumnIndex] = v;
+ container.add(v);
+ }
+
+ // track how long we spend populating partition column vectors
+ miscTimer.start();
+
+ // populate partition vectors.
+ descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
+
+ logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+ miscTimer.reset();
+
+ // materialize the expression; only need to do this once
+ if (batchIndex == 0) {
+ materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
+ if (materializedExpr == null) {
+ // continue without partition pruning; no need to log anything here since
+ // materializePruneExpr logs it already
+ logger.info("Total pruning elapsed time: {} ms",
+ totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+ }
+
+ output.allocateNew(partitions.size());
+
+ // start the timer to evaluate how long we spend in the interpreter evaluation
+ miscTimer.start();
+
+ InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
+
+ logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+ miscTimer.reset();
+
+ int recordCount = 0;
+ int qualifiedCount = 0;
+
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for(PartitionLocation part: partitions){
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
+ newFiles.add(part.getEntirePartitionLocation());
+ qualifiedCount++;
+ }
+ recordCount++;
+ }
+ logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
+ batchIndex++;
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune partition.", e);
+ logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return; // continue without partition pruning
+ } finally {
+ container.clear();
+ if (output != null) {
+ output.clear();
+ }
+ }
+ }
+
+ try {
+
+ boolean canDropFilter = true;
+
+ if (newFiles.isEmpty()) {
+ assert firstLocation != null;
+ newFiles.add(firstLocation);
+ canDropFilter = false;
+ }
+
+ if (newFiles.size() == numTotal) {
+ logger.info("No partitions were eligible for pruning");
+ return;
+ }
+
+ logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
+
+ List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
+ List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
+ conjuncts.removeAll(pruneConjuncts);
+ RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
+
+ RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+
+ condition = condition.accept(reverseVisitor);
+ pruneCondition = pruneCondition.accept(reverseVisitor);
+
+ RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
+ DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
+ FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
+ FileSelection oldFileSelection = formatSelection.getSelection();
+ FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
+ FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+ DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
+ oldTable.getUserName(), newFormatSelection);
+ RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+ // TODO: The new scan should come from the PartitionDescriptor
+ // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan
+ EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+
+ RelNode inputRel = newScan;
+
+ if (projectRel != null) {
+ inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+ }
+
+ if (newCondition.isAlwaysTrue() && canDropFilter) {
+ call.transformTo(inputRel);
+ } else {
+ final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
+ call.transformTo(newFilter);
+ }
+
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ } finally {
+ logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = new Stopwatch();
@@ -388,5 +602,5 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return optimizerContext;
}
- public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel);
+ public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 6df3ffc..0d49b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -242,4 +242,8 @@ public class FileSelection {
}
}
+ public List<FileStatus> getFileStatuses() {
+ return statuses;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0b52b9e..0ca6bb9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1019,7 +1019,7 @@ public class TestExampleQueries extends BaseTestQuery {
.sqlQuery(query)
.expectsEmptyResultSet()
.optionSettingQueriesForTestQuery("ALTER SESSION SET `planner.enable_hashjoin` = false; " +
- "ALTER SESSION SET `planner.disable_exchanges` = true")
+ "ALTER SESSION SET `planner.disable_exchanges` = true")
.build()
.run();
@@ -1194,4 +1194,9 @@ public class TestExampleQueries extends BaseTestQuery {
.build()
.run();
}
+
+ @Test
+ public void t() throws Exception {
+ test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
+ }
}
\ No newline at end of file