You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/06/24 10:30:33 UTC
drill git commit: DRILL-3333: Parquet writer auto-partitioning and
partition pruning
Repository: drill
Updated Branches:
refs/heads/master 3aa82bc92 -> 5a34d8194
DRILL-3333: Parquet writer auto-partitioning and partition pruning
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5a34d819
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5a34d819
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5a34d819
Branch: refs/heads/master
Commit: 5a34d8194a660f82391e1143f445a7a890340e34
Parents: 3aa82bc
Author: Steven Phillips <sm...@apache.org>
Authored: Tue Jun 23 18:41:12 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Jun 24 00:48:01 2015 -0700
----------------------------------------------------------------------
.../codegen/templates/AbstractRecordWriter.java | 18 +
.../templates/EventBasedRecordWriter.java | 8 +
.../codegen/templates/NewValueFunctions.java | 103 +++
.../main/codegen/templates/RecordWriter.java | 10 +-
.../templates/StringOutputRecordWriter.java | 11 +-
.../exec/physical/base/AbstractGroupScan.java | 6 +
.../drill/exec/physical/base/GroupScan.java | 7 +
.../exec/physical/impl/WriterRecordBatch.java | 2 +-
.../planner/ParquetPartitionDescriptor.java | 62 ++
.../exec/planner/logical/DrillRuleSets.java | 2 +
.../logical/partition/PruneScanRule.java | 713 ++++++++++++-------
.../sql/handlers/CreateTableHandler.java | 4 +-
.../drill/exec/store/NewValueFunction.java | 209 ++++++
.../exec/store/easy/json/JsonRecordWriter.java | 3 +-
.../exec/store/parquet/ParquetGroupScan.java | 310 +++++++-
.../exec/store/parquet/ParquetRecordWriter.java | 42 +-
.../exec/store/text/DrillTextRecordWriter.java | 1 +
17 files changed, 1237 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6b6065f..5f1f42f 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+import java.lang.UnsupportedOperationException;
+
<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/AbstractRecordWriter.java" />
<#include "/@includes/license.ftl" />
@@ -24,6 +26,8 @@ package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.BitVector.Accessor;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
@@ -31,6 +35,20 @@ import java.lang.UnsupportedOperationException;
public abstract class AbstractRecordWriter implements RecordWriter {
+ private Accessor newPartitionVector;
+
+ protected void setPartitionVector(BitVector newPartitionVector) {
+ this.newPartitionVector = newPartitionVector.getAccessor();
+ }
+
+ protected boolean newPartition(int index) {
+ return newPartitionVector.get(index) == 1;
+ }
+
+ public void checkForNewPartition(int index) {
+ // no op
+ }
+
@Override
public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
throw new UnsupportedOperationException("Doesn't support writing Map'");
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index 797f3cb..cf1529d 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+import org.apache.drill.exec.planner.physical.WriterPrel;
+
<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" />
<#include "/@includes/license.ftl" />
@@ -25,6 +27,8 @@ package org.apache.drill.exec.store;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -54,6 +58,7 @@ public class EventBasedRecordWriter {
int counter = 0;
for (; counter < recordCount; counter++) {
+ recordWriter.checkForNewPartition(counter);
recordWriter.startRecord();
// write the current record
for (FieldConverter converter : fieldConverters) {
@@ -73,6 +78,9 @@ public class EventBasedRecordWriter {
try {
int fieldId = 0;
for (VectorWrapper w : batch) {
+ if (w.getField().getPath().equals(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD))) {
+ continue;
+ }
FieldReader reader = w.getValueVector().getReader();
FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader);
fieldConverters.add(converter);
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
new file mode 100644
index 0000000..b8ba4cc
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/GNewValueFunctions.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+import javax.inject.Inject;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class GNewValueFunctions {
+<#list vv.types as type>
+<#if type.major == "Fixed">
+
+<#list type.minor as minor>
+<#list vv.modes as mode>
+ <#if mode.name != "Repeated">
+
+<#if !minor.class.startsWith("Decimal28") && !minor.class.startsWith("Decimal38") && !minor.class.startsWith("Interval")>
+@SuppressWarnings("unused")
+@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc{
+
+ @Param ${mode.prefix}${minor.class}Holder in;
+ @Workspace ${mode.prefix}${minor.class}Holder previous;
+ @Workspace Boolean initialized;
+ @Output BitHolder out;
+
+ public void setup() {
+ initialized = false;
+ }
+
+ <#if mode.name == "Required">
+ public void eval() {
+ if (initialized) {
+ if (in.value == previous.value) {
+ out.value = 0;
+ } else {
+ previous.value = in.value;
+ out.value = 1;
+ }
+ } else {
+ previous.value = in.value;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ </#if>
+ <#if mode.name == "Optional">
+ public void eval() {
+ if (initialized) {
+ if (in.isSet == 0 && previous.isSet == 0) {
+ out.value = 0;
+ } else if (in.value == previous.value) {
+ out.value = 0;
+ } else {
+ previous.value = in.value;
+ previous.isSet = in.isSet;
+ out.value = 1;
+ }
+ } else {
+ previous.value = in.value;
+ previous.isSet = in.isSet;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ </#if>
+}
+</#if> <#-- minor.class.startWith -->
+
+</#if> <#-- mode.name -->
+</#list>
+</#list>
+</#if> <#-- type.major -->
+</#list>
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/RecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index c6325fd..a37ffa8 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,7 @@ package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -43,10 +44,15 @@ public interface RecordWriter {
/**
* Update the schema in RecordWriter. Called at least once before starting writing the records.
- * @param schema
+ * @param batch
* @throws IOException
*/
- void updateSchema(BatchSchema schema) throws IOException;
+ void updateSchema(VectorAccessible batch) throws IOException;
+
+ /**
+ * Check if the writer should start a new partition, and if so, start a new partition
+ */
+ public void checkForNewPartition(int index);
/**
* Called before starting writing fields in a record.
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index f704cca..c175900 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+import org.apache.drill.exec.store.AbstractRecordWriter;
+
import java.lang.Override;
import java.lang.UnsupportedOperationException;
@@ -31,6 +33,7 @@ import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -48,14 +51,16 @@ import java.util.Map;
*
* This is useful for text format writers such as CSV, TSV etc.
*/
-public abstract class StringOutputRecordWriter implements RecordWriter {
+public abstract class StringOutputRecordWriter extends AbstractRecordWriter {
private final BufferAllocator allocator;
protected StringOutputRecordWriter(BufferAllocator allocator){
this.allocator = allocator;
}
-
- public void updateSchema(BatchSchema schema) throws IOException {
+
+ @Override
+ public void updateSchema(VectorAccessible batch) throws IOException {
+ BatchSchema schema = batch.getSchema();
List<String> columnNames = Lists.newArrayList();
for (int i=0; i < schema.getFieldCount(); i++) {
columnNames.add(schema.getColumn(i).getLastName());
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 5c4ee4d..1277ec4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -122,4 +123,9 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
public int getOperatorType() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public List<SchemaPath> getPartitionColumns() {
+ return Lists.newArrayList();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 2d16cd0..946c7e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -91,4 +91,11 @@ public interface GroupScan extends Scan, HasAffinity{
*/
public boolean supportsPartitionFilterPushdown();
+ /**
+ * Returns a list of columns that can be used for partition pruning
+ *
+ */
+ @JsonIgnore
+ public List<SchemaPath> getPartitionColumns();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index d5d64a7..5fe7667 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -150,7 +150,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
try {
// update the schema in RecordWriter
stats.startSetup();
- recordWriter.updateSchema(incoming.getSchema());
+ recordWriter.updateSchema(incoming);
// Create two vectors for:
// 1. Fragment unique id.
// 2. Summary: currently contains number of records written.
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
new file mode 100644
index 0000000..127e70a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * PartitionDescriptor that describes partitions based on column names instead of directory structure
+ */
+public class ParquetPartitionDescriptor implements PartitionDescriptor {
+
+ private final List<SchemaPath> partitionColumns;
+
+ public ParquetPartitionDescriptor(List<SchemaPath> partitionColumns) {
+ this.partitionColumns = partitionColumns;
+ }
+
+ @Override
+ public int getPartitionHierarchyIndex(String partitionName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isPartitionName(String name) {
+ return partitionColumns.contains(name);
+ }
+
+ @Override
+ public Integer getIdIfValid(String name) {
+ SchemaPath schemaPath = SchemaPath.getSimplePath(name);
+ int id = partitionColumns.indexOf(schemaPath);
+ if (id == -1) {
+ return null;
+ }
+ return id;
+ }
+
+ @Override
+ public int getMaxHierarchyLevel() {
+ return partitionColumns.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index d9b1354..daa7276 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -155,6 +155,8 @@ public class DrillRuleSets {
PruneScanRule.getFilterOnProject(context),
PruneScanRule.getFilterOnScan(context),
+ PruneScanRule.getFilterOnProjectParquet(context),
+ PruneScanRule.getFilterOnScanParquet(context),
/*
Convert from Calcite Logical to Drill Logical Rules.
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 2544d34..c8c7db6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -1,84 +1,214 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.drill.exec.planner.logical.partition;
+import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.util.BitSets;
-
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.FileGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
-import org.apache.drill.exec.planner.logical.DrillFilterRel;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.store.dfs.FileSelection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+
+ import org.apache.calcite.rex.RexUtil;
+ import org.apache.calcite.util.BitSets;
+
+ import org.apache.drill.common.expression.ErrorCollectorImpl;
+ import org.apache.drill.common.expression.LogicalExpression;
+ import org.apache.drill.common.expression.SchemaPath;
+ import org.apache.drill.common.types.TypeProtos.MajorType;
+ import org.apache.drill.common.types.TypeProtos.MinorType;
+ import org.apache.drill.common.types.Types;
+ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+ import org.apache.drill.exec.expr.TypeHelper;
+ import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
+ import org.apache.drill.exec.memory.BufferAllocator;
+ import org.apache.drill.exec.ops.QueryContext;
+ import org.apache.drill.exec.physical.base.FileGroupScan;
+ import org.apache.drill.exec.physical.base.GroupScan;
+ import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
+import org.apache.drill.exec.planner.ParquetPartitionDescriptor;
+import org.apache.drill.exec.planner.PartitionDescriptor;
+ import org.apache.drill.exec.planner.logical.DrillFilterRel;
+ import org.apache.drill.exec.planner.logical.DrillOptiq;
+ import org.apache.drill.exec.planner.logical.DrillParseContext;
+ import org.apache.drill.exec.planner.logical.DrillProjectRel;
+ import org.apache.drill.exec.planner.logical.DrillRel;
+ import org.apache.drill.exec.planner.logical.DrillScanRel;
+ import org.apache.drill.exec.planner.logical.RelOptHelper;
+ import org.apache.drill.exec.planner.physical.PlannerSettings;
+ import org.apache.drill.exec.planner.physical.PrelUtil;
+ import org.apache.drill.exec.record.MaterializedField;
+ import org.apache.drill.exec.record.VectorContainer;
+ import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+ import org.apache.drill.exec.vector.NullableVarCharVector;
+ import org.apache.calcite.rel.RelNode;
+ import org.apache.calcite.plan.RelOptRule;
+ import org.apache.calcite.plan.RelOptRuleCall;
+ import org.apache.calcite.plan.RelOptRuleOperand;
+ import org.apache.calcite.plan.RelOptUtil;
+ import org.apache.calcite.rex.RexNode;
+
+ import com.google.common.base.Charsets;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import org.apache.drill.exec.vector.ValueVector;
public abstract class PruneScanRule extends RelOptRule {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
+
+ public static final RelOptRule getFilterOnProject(QueryContext context){
+ return new PruneScanRule(
+ RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+ "PruneScanRule:Filter_On_Project",
+ context) {
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final DrillScanRel scan = (DrillScanRel) call.rel(2);
+ GroupScan groupScan = scan.getGroupScan();
+ // this rule is applicable only for dfs based partition pruning
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+ final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
+ final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ doOnMatch(call, filterRel, projectRel, scanRel);
+ };
+
+ @Override
+ protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
+ }
+
+ @Override
+ protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+ int record = 0;
+ for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+ final PathPartition partition = iter.next();
+ for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+ if(partition.dirs[partitionColumnIndex] == null){
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record);
+ }else{
+ byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8);
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length);
+ }
+ }
+ }
+
+ for(ValueVector v : vectors){
+ if(v == null){
+ continue;
+ }
+ v.getMutator().setValueCount(partitions.size());
+ }
+ }
+
+ @Override
+ protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ protected List<String> getFiles(DrillScanRel scanRel) {
+ return ((FormatSelection)scanRel.getDrillTable().getSelection()).getAsFiles();
+ }
+ };
+ }
+
+ public static final RelOptRule getFilterOnScan(QueryContext context){
+ return new PruneScanRule(
+ RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+ "PruneScanRule:Filter_On_Scan", context) {
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ GroupScan groupScan = scan.getGroupScan();
+ // this rule is applicable only for dfs based partition pruning
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+ final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ doOnMatch(call, filterRel, null, scanRel);
+ }
+
+ @Override
+ protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
+ }
+
+ @Override
+ protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+ int record = 0;
+ for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+ final PathPartition partition = iter.next();
+ for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+ if(partition.dirs[partitionColumnIndex] == null){
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record);
+ }else{
+ byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8);
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length);
+ }
+ }
+ }
+
+ for(ValueVector v : vectors){
+ if(v == null){
+ continue;
+ }
+ v.getMutator().setValueCount(partitions.size());
+ }
+ }
+
+ @Override
+ protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ protected List<String> getFiles(DrillScanRel scanRel) {
+ return ((FormatSelection)scanRel.getDrillTable().getSelection()).getAsFiles();
+ }
+ };
+ }
- public static final RelOptRule getFilterOnProject(QueryContext context){
- return new PruneScanRule(
- RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
- "PruneScanRule:Filter_On_Project",
- context) {
+ public static final RelOptRule getFilterOnProjectParquet(QueryContext context){
+ return new PruneScanRule(
+ RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+ "PruneScanRule:Filter_On_Project_Parquet",
+ context) {
@Override
- public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(2);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
+ public boolean matches(RelOptRuleCall call) {
+ final DrillScanRel scan = (DrillScanRel) call.rel(2);
+ GroupScan groupScan = scan.getGroupScan();
+ // this rule is applicable only for dfs based partition pruning
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ }
@Override
public void onMatch(RelOptRuleCall call) {
@@ -87,222 +217,291 @@ public abstract class PruneScanRule extends RelOptRule {
final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
};
- };
- }
-
- public static final RelOptRule getFilterOnScan(QueryContext context){
- return new PruneScanRule(
- RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
- "PruneScanRule:Filter_On_Scan", context) {
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(1);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
@Override
- public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
- doOnMatch(call, filterRel, null, scanRel);
+ protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns());
}
- };
- }
-
- final QueryContext context;
-
- private PruneScanRule(RelOptRuleOperand operand, String id, QueryContext context) {
- super(operand, id);
- this.context = context;
- }
- protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
- final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
- FileSystemPartitionDescriptor descriptor = new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
- final BufferAllocator allocator = context.getAllocator();
-
-
- RexNode condition = null;
- if(projectRel == null){
- condition = filterRel.getCondition();
- }else{
- // get the filter as if it were below the projection.
- condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
- }
-
- Map<Integer, String> dirNames = Maps.newHashMap();
- List<String> fieldNames = scanRel.getRowType().getFieldNames();
- BitSet columnBitset = new BitSet();
- BitSet dirBitset = new BitSet();
- {
- int colIndex = 0;
- for(String field : fieldNames){
- final Integer dirIndex = descriptor.getIdIfValid(field);
- if(dirIndex != null){
- dirNames.put(dirIndex, field);
- dirBitset.set(dirIndex);
- columnBitset.set(colIndex);
+ @Override
+ protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+ int record = 0;
+ for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+ final PathPartition partition = iter.next();
+ for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+ SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ ((ParquetGroupScan)groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file);
+ }
}
- colIndex++;
- }
- }
-
- if(dirBitset.isEmpty()){
- return;
- }
-
- FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
- c.analyze(condition);
- RexNode pruneCondition = c.getFinalCondition();
-
- if(pruneCondition == null){
- return;
- }
-
- // set up the partitions
- final FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection();
- final List<String> files = origSelection.getAsFiles();
- final String selectionRoot = origSelection.getSelection().selectionRoot;
- List<PathPartition> partitions = Lists.newLinkedList();
-
- // let's only deal with one batch of files for now.
- if(files.size() > Character.MAX_VALUE){
- return;
- }
-
- for(String f : files){
- partitions.add(new PathPartition(descriptor.getMaxHierarchyLevel(), selectionRoot, f));
- }
-
- final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
- final VectorContainer container = new VectorContainer();
-
- try{
- final NullableVarCharVector[] vectors = new NullableVarCharVector[descriptor.getMaxHierarchyLevel()];
- for(int dirIndex : BitSets.toIter(dirBitset)){
- NullableVarCharVector vector = new NullableVarCharVector(MaterializedField.create(dirNames.get(dirIndex), Types.optional(MinorType.VARCHAR)), allocator);
- vector.allocateNew(5000, partitions.size());
- vectors[dirIndex] = vector;
- container.add(vector);
- }
- // populate partition vectors.
- int record = 0;
- for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
- final PathPartition partition = iter.next();
- for(int dirIndex : BitSets.toIter(dirBitset)){
- if(partition.dirs[dirIndex] == null){
- vectors[dirIndex].getMutator().setNull(record);
- }else{
- byte[] bytes = partition.dirs[dirIndex].getBytes(Charsets.UTF_8);
- vectors[dirIndex].getMutator().setSafe(record, bytes, 0, bytes.length);
+ for(ValueVector v : vectors){
+ if(v == null){
+ continue;
}
+ v.getMutator().setValueCount(partitions.size());
}
}
- for(NullableVarCharVector v : vectors){
- if(v == null){
- continue;
- }
- v.getMutator().setValueCount(partitions.size());
+ @Override
+ protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+ return ((ParquetGroupScan)groupScan).getTypeForColumn(column);
}
-
- // materialize the expression
- logger.debug("Attempting to prune {}", pruneCondition);
- LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
- ErrorCollectorImpl errors = new ErrorCollectorImpl();
- LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, context.getFunctionRegistry());
- if (errors.getErrorCount() != 0) {
- logger.warn("Failure while materializing expression [{}]. Errors: {}", expr, errors);
+ @Override
+ protected List<String> getFiles(DrillScanRel scanRel) {
+ ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan();
+ return new ArrayList(groupScan.getFileSet());
}
+ };
+ }
- output.allocateNew(partitions.size());
- InterpreterEvaluator.evaluate(partitions.size(), context, container, output, materializedExpr);
- record = 0;
-
- List<String> newFiles = Lists.newArrayList();
- for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
- PathPartition part = iter.next();
- if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
- newFiles.add(part.file);
- }
- }
+ // Using separate rules for Parquet column based partition pruning. In the future, we may want to see if we can combine these into
+ // a single rule which handles both types of pruning
- boolean canDropFilter = true;
+ public static final RelOptRule getFilterOnScanParquet(QueryContext context){
+ return new PruneScanRule(
+ RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+ "PruneScanRule:Filter_On_Scan_Parquet", context) {
- if(newFiles.isEmpty()){
- newFiles.add(files.get(0));
- canDropFilter = false;
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ GroupScan groupScan = scan.getGroupScan();
+ // this rule is applicable only for dfs based partition pruning
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
}
- if(newFiles.size() == files.size()){
- return;
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+ final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ doOnMatch(call, filterRel, null, scanRel);
}
- logger.debug("Pruned {} => {}", files, newFiles);
-
- List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
- List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
- conjuncts.removeAll(pruneConjuncts);
- RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
-
- final FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true);
- final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
- final DrillScanRel newScanRel =
- new DrillScanRel(scanRel.getCluster(),
- scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
- scanRel.getTable(),
- newScan,
- scanRel.getRowType(),
- scanRel.getColumns());
+ @Override
+ protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns());
+ }
- RelNode inputRel = newScanRel;
+ @Override
+ protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+ int record = 0;
+ for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+ final PathPartition partition = iter.next();
+ for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+ SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ ((ParquetGroupScan)groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file);
+ }
+ }
- if(projectRel != null){
- inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+ for(ValueVector v : vectors){
+ if(v == null){
+ continue;
+ }
+ v.getMutator().setValueCount(partitions.size());
+ }
}
- if (newCondition.isAlwaysTrue() && canDropFilter) {
- call.transformTo(inputRel);
- } else {
- final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
- call.transformTo(newFilter);
+ @Override
+ protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+ return ((ParquetGroupScan)groupScan).getTypeForColumn(column);
}
- }catch(Exception e){
- logger.warn("Exception while trying to prune partition.", e);
- }finally{
- container.clear();
- if(output !=null){
- output.clear();
+ @Override
+ protected List<String> getFiles(DrillScanRel scanRel) {
+ ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan();
+ return new ArrayList(groupScan.getFileSet());
}
- }
+ };
}
- private static class PathPartition {
- final String[] dirs;
- final String file;
-
- public PathPartition(int max, String selectionRoot, String file){
- this.file = file;
- int start = file.indexOf(selectionRoot) + selectionRoot.length();
- String postPath = file.substring(start);
- if(postPath.charAt(0) == '/'){
- postPath = postPath.substring(1);
- }
- String[] mostDirs = postPath.split("/");
- this.dirs = new String[max];
- int maxLoop = Math.min(max, mostDirs.length - 1);
- for(int i =0; i < maxLoop; i++){
- this.dirs[i] = mostDirs[i];
- }
- }
+ final QueryContext context;
+
+ private PruneScanRule(RelOptRuleOperand operand, String id, QueryContext context) {
+ super(operand, id);
+ this.context = context;
+ }
+
+ protected abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel);
+
+ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
+ final BufferAllocator allocator = context.getAllocator();
+
+
+ RexNode condition = null;
+ if(projectRel == null){
+ condition = filterRel.getCondition();
+ }else{
+ // get the filter as if it were below the projection.
+ condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+ }
+
+
+ Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ List<String> fieldNames = scanRel.getRowType().getFieldNames();
+ BitSet columnBitset = new BitSet();
+ BitSet partitionColumnBitSet = new BitSet();
+
+ {
+ int relColIndex = 0;
+ for(String field : fieldNames){
+ final Integer partitionIndex = descriptor.getIdIfValid(field);
+ if(partitionIndex != null){
+ fieldNameMap.put(partitionIndex, field);
+ partitionColumnBitSet.set(partitionIndex);
+ columnBitset.set(relColIndex);
+ }
+ relColIndex++;
+ }
+ }
+
+ if(partitionColumnBitSet.isEmpty()){
+ return;
+ }
+
+ FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
+ c.analyze(condition);
+ RexNode pruneCondition = c.getFinalCondition();
+
+ if(pruneCondition == null){
+ return;
+ }
+
+ // set up the partitions
+ final GroupScan groupScan = scanRel.getGroupScan();
+ final FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection();
+ final List<String> files = getFiles(scanRel);
+ final String selectionRoot = origSelection.getSelection().selectionRoot;
+ List<PathPartition> partitions = Lists.newLinkedList();
+
+ // let's only deal with one batch of files for now.
+ if(files.size() > Character.MAX_VALUE){
+ return;
+ }
+
+ for(String f : files){
+ partitions.add(new PathPartition(descriptor.getMaxHierarchyLevel(), selectionRoot, f));
+ }
+
+ final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
+ final VectorContainer container = new VectorContainer();
+
+ try{
+ final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+ for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+ SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ MajorType type = getVectorType(groupScan, column);
+ MaterializedField field = MaterializedField.create(column, type);
+ ValueVector v = TypeHelper.getNewVector(field, allocator);
+ v.allocateNew();
+ vectors[partitionColumnIndex] = v;
+ container.add(v);
+ }
+
+ // populate partition vectors.
+
+ populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap, groupScan);
+
+ // materialize the expression
+ logger.debug("Attempting to prune {}", pruneCondition);
+ LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
+ ErrorCollectorImpl errors = new ErrorCollectorImpl();
+ LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, context.getFunctionRegistry());
+ if (errors.getErrorCount() != 0) {
+ logger.warn("Failure while materializing expression [{}]. Errors: {}", expr, errors);
+ }
+
+ output.allocateNew(partitions.size());
+ InterpreterEvaluator.evaluate(partitions.size(), context, container, output, materializedExpr);
+ int record = 0;
+
+ List<String> newFiles = Lists.newArrayList();
+ for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+ PathPartition part = iter.next();
+ if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
+ newFiles.add(part.file);
+ }
+ }
+
+ boolean canDropFilter = true;
+
+ if(newFiles.isEmpty()){
+ newFiles.add(files.get(0));
+ canDropFilter = false;
+ }
+
+ if(newFiles.size() == files.size()){
+ return;
+ }
+
+ logger.debug("Pruned {} => {}", files, newFiles);
+
+ List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
+ List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
+ conjuncts.removeAll(pruneConjuncts);
+ RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
+
+ final FileSelection newFileSelection = new FileSelection(newFiles, selectionRoot, true);
+ final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
+ final DrillScanRel newScanRel =
+ new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newScan,
+ scanRel.getRowType(),
+ scanRel.getColumns());
+
+ RelNode inputRel = newScanRel;
+
+ if(projectRel != null){
+ inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+ }
+
+ if (newCondition.isAlwaysTrue() && canDropFilter) {
+ call.transformTo(inputRel);
+ } else {
+ final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
+ call.transformTo(newFilter);
+ }
+
+ }catch(Exception e){
+ logger.warn("Exception while trying to prune partition.", e);
+ }finally{
+ container.clear();
+ if(output !=null){
+ output.clear();
+ }
+ }
+ }
+
+ protected abstract void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan);
+
+ protected abstract MajorType getVectorType(GroupScan groupScan, SchemaPath column);
+
+ protected abstract List<String> getFiles(DrillScanRel scanRel);
+
+ private static class PathPartition {
+ final String[] dirs;
+ final String file;
+
+ public PathPartition(int max, String selectionRoot, String file){
+ this.file = file;
+ int start = file.indexOf(selectionRoot) + selectionRoot.length();
+ String postPath = file.substring(start);
+ if(postPath.charAt(0) == '/'){
+ postPath = postPath.substring(1);
+ }
+ String[] mostDirs = postPath.split("/");
+ this.dirs = new String[max];
+ int maxLoop = Math.min(max, mostDirs.length - 1);
+ for(int i =0; i < maxLoop; i++){
+ this.dirs[i] = mostDirs[i];
+ }
+ }
- }
+ }
-}
+ }
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 1e63748..6dda1a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -39,6 +40,7 @@ import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -159,7 +161,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
@Override
public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
- final Prel child = ((Prel)prel.getInput()).accept(this, null);
+ final Prel child = ((Prel) prel.getInput()).accept(this, null);
final RelDataType childRowType = child.getRowType();
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
new file mode 100644
index 0000000..fedb473
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import javax.inject.Inject;
+
+/**
+ * The functions are similar to those created through FreeMarker template for fixed types. There is not much benefit to
+ * using code generation for generating the functions for variable length types, so simply doing them by hand.
+ */
+public class NewValueFunction {
+
+ @FunctionTemplate(name = "newPartitionValue",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.INTERNAL)
+ public static class NewValueVarChar implements DrillSimpleFunc {
+
+ @Param VarCharHolder in;
+ @Workspace VarCharHolder previous;
+ @Workspace Boolean initialized;
+ @Output BitHolder out;
+ @Inject DrillBuf buf;
+
+ public void setup() {
+ initialized = false;
+ previous.buffer = buf;
+ previous.start = 0;
+ }
+
+ public void eval() {
+ int length = in.end - in.start;
+
+ if (initialized) {
+ if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+ out.value = 0;
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ out.value = 1;
+ }
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ }
+
+ @FunctionTemplate(name = "newPartitionValue",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.INTERNAL)
+ public static class NewValueVarCharNullable implements DrillSimpleFunc {
+
+ @Param NullableVarCharHolder in;
+ @Workspace NullableVarCharHolder previous;
+ @Workspace Boolean initialized;
+ @Output BitHolder out;
+ @Inject DrillBuf buf;
+
+ public void setup() {
+ initialized = false;
+ previous.buffer = buf;
+ previous.start = 0;
+ }
+
+ public void eval() {
+ int length = in.isSet == 0 ? 0 : in.end - in.start;
+
+ if (initialized) {
+ if (previous.isSet == 0 && in.isSet == 0 ||
+ (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+ previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
+ out.value = 0;
+ } else {
+ if (in.isSet == 1) {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ }
+ previous.isSet = in.isSet;
+ out.value = 1;
+ }
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ previous.isSet = 1;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ }
+
+ @FunctionTemplate(name = "newPartitionValue",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.INTERNAL)
+ public static class NewValueVarBinary implements DrillSimpleFunc {
+
+ @Param VarBinaryHolder in;
+ @Workspace VarBinaryHolder previous;
+ @Workspace Boolean initialized;
+ @Output BitHolder out;
+ @Inject DrillBuf buf;
+
+ public void setup() {
+ initialized = false;
+ previous.buffer = buf;
+ previous.start = 0;
+ }
+
+ public void eval() {
+ int length = in.end - in.start;
+
+ if (initialized) {
+ if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+ out.value = 0;
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ out.value = 1;
+ }
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ }
+
+ @FunctionTemplate(name = "newPartitionValue",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.INTERNAL)
+ public static class NewValueVarBinaryNullable implements DrillSimpleFunc {
+
+ @Param NullableVarBinaryHolder in;
+ @Workspace NullableVarBinaryHolder previous;
+ @Workspace Boolean initialized;
+ @Output BitHolder out;
+ @Inject DrillBuf buf;
+
+ public void setup() {
+ initialized = false;
+ previous.buffer = buf;
+ previous.start = 0;
+ }
+
+ public void eval() {
+ int length = in.isSet == 0 ? 0 : in.end - in.start;
+
+ if (initialized) {
+ if (previous.isSet == 0 && in.isSet == 0 ||
+ (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+ previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
+ out.value = 0;
+ } else {
+ if (in.isSet == 1) {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ }
+ previous.isSet = in.isSet;
+ out.value = 1;
+ }
+ } else {
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+ previous.end = in.end - in.start;
+ previous.isSet = 1;
+ out.value = 1;
+ initialized = true;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index a43a4a0..ea45653 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.JSONOutputRecordWriter;
@@ -90,7 +91,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
}
@Override
- public void updateSchema(BatchSchema schema) throws IOException {
+ public void updateSchema(VectorAccessible batch) throws IOException {
// no op
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index cf39518..ec28833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -19,18 +19,28 @@ package org.apache.drill.exec.store.parquet;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
@@ -41,6 +51,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -54,14 +65,44 @@ import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableTinyIntVector;
+import org.apache.drill.exec.vector.NullableUInt1Vector;
+import org.apache.drill.exec.vector.NullableUInt2Vector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.joda.time.DateTimeUtils;
+import parquet.column.statistics.Statistics;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileWriter;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
import com.codahale.metrics.MetricRegistry;
@@ -74,6 +115,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractFileGroupScan {
@@ -169,6 +213,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.rowGroupInfos = that.rowGroupInfos == null ? null : Lists.newArrayList(that.rowGroupInfos);
this.selectionRoot = that.selectionRoot;
this.columnValueCounts = that.columnValueCounts;
+ this.columnTypeMap = that.columnTypeMap;
+ this.partitionValueMap = that.partitionValueMap;
+ this.fileSet = that.fileSet;
}
@@ -214,6 +261,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
+ public Set<String> getFileSet() {
+ return fileSet;
+ }
+
+ private Set<String> fileSet = Sets.newHashSet();
+
private void readFooterHelper(List<FileStatus> statuses) throws IOException {
watch.reset();
watch.start();
@@ -227,10 +280,19 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
ColumnChunkMetaData columnChunkMetaData;
List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
+ boolean first = true;
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
for (Footer footer : footers) {
int index = 0;
ParquetMetadata metadata = footer.getParquetMetadata();
+ FileMetaData fileMetaData = metadataConverter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata);
+ HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+ for (SchemaElement se : fileMetaData.getSchema()) {
+ schemaElements.put(se.getName(), se);
+ }
for (BlockMetaData rowGroup : metadata.getBlocks()) {
+ String file = Path.getPathWithoutSchemeAndAuthority(footer.getFile()).toString();
+ fileSet.add(file);
long valueCountInGrp = 0;
// need to grab block information from HDFS
columnChunkMetaData = rowGroup.getColumns().iterator().next();
@@ -242,28 +304,49 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
for (ColumnChunkMetaData col : rowGroup.getColumns()) {
length += col.getTotalSize();
valueCountInGrp = Math.max(col.getValueCount(), valueCountInGrp);
- SchemaPath path = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
+ SchemaPath schemaPath = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
long previousCount = 0;
long currentCount = 0;
- if (! columnValueCounts.containsKey(path)) {
+ if (! columnValueCounts.containsKey(schemaPath)) {
// create an entry for this column
- columnValueCounts.put(path, previousCount /* initialize to 0 */);
+ columnValueCounts.put(schemaPath, previousCount /* initialize to 0 */);
} else {
- previousCount = columnValueCounts.get(path);
+ previousCount = columnValueCounts.get(schemaPath);
}
boolean statsAvail = (col.getStatistics() != null && !col.getStatistics().isEmpty());
if (statsAvail && previousCount != GroupScan.NO_COLUMN_STATS) {
currentCount = col.getValueCount() - col.getStatistics().getNumNulls(); // only count non-nulls
- columnValueCounts.put(path, previousCount + currentCount);
+ columnValueCounts.put(schemaPath, previousCount + currentCount);
} else {
// even if 1 chunk does not have stats, we cannot rely on the value count for this column
- columnValueCounts.put(path, GroupScan.NO_COLUMN_STATS);
+ columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
}
+ // check if this column can be used for partition pruning
+ SchemaElement se = schemaElements.get(schemaPath.getAsUnescapedPath());
+ boolean partitionColumn = checkForPartitionColumn(schemaPath, col, se, first);
+ if (partitionColumn) {
+ Map<SchemaPath,Object> map = partitionValueMap.get(file);
+ if (map == null) {
+ map = Maps.newHashMap();
+ partitionValueMap.put(file, map);
+ }
+ Object value = map.get(schemaPath);
+ Object currentValue = col.getStatistics().genericGetMax();
+ if (value != null) {
+ if (value != currentValue) {
+ columnTypeMap.remove(schemaPath);
+ }
+ } else {
+ map.put(schemaPath, currentValue);
+ }
+ } else {
+ columnTypeMap.remove(schemaPath);
+ }
}
String filePath = footer.getFile().toUri().getPath();
@@ -272,6 +355,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
index++;
rowCount += rowGroup.getRowCount();
+ first = false;
}
}
@@ -281,6 +365,109 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
}
+ @JsonIgnore
+ private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap();
+
+ /**
+ * When reading the very first footer, any column is a potential partition column. So for the first footer, we check
+ * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
+ * remaining footers, we will not find any new partition columns, but we may discover that what was previously a
+ * potential partition column now no longer qualifies, so it needs to be removed from the list.
+ * @param column
+ * @param columnChunkMetaData
+ * @param se
+ * @param first
+ * @return whether column is a potential partition column
+ */
+ private boolean checkForPartitionColumn(SchemaPath column, ColumnChunkMetaData columnChunkMetaData, SchemaElement se, boolean first) {
+ if (first) {
+ if (hasSingleValue(columnChunkMetaData)) {
+ columnTypeMap.put(column, getType(columnChunkMetaData, se));
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ if (!columnTypeMap.keySet().contains(column)) {
+ return false;
+ } else {
+ if (!hasSingleValue(columnChunkMetaData)) {
+ columnTypeMap.remove(column);
+ return false;
+ }
+ if (!getType(columnChunkMetaData, se).equals(columnTypeMap.get(column))) {
+ columnTypeMap.remove(column);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private MajorType getType(ColumnChunkMetaData columnChunkMetaData, SchemaElement schemaElement) {
+ ConvertedType originalType = schemaElement == null ? null : schemaElement.getConverted_type();
+
+ if (originalType != null) {
+ switch (originalType) {
+ case DECIMAL:
+ return Types.optional(MinorType.DECIMAL18);
+ case DATE:
+ return Types.optional(MinorType.DATE);
+ case TIME_MILLIS:
+ return Types.optional(MinorType.TIME);
+ case TIMESTAMP_MILLIS:
+ return Types.optional(MinorType.TIMESTAMP);
+ case UTF8:
+ return Types.optional(MinorType.VARCHAR);
+ case UINT_8:
+ return Types.optional(MinorType.UINT1);
+ case UINT_16:
+ return Types.optional(MinorType.UINT2);
+ case UINT_32:
+ return Types.optional(MinorType.UINT4);
+ case UINT_64:
+ return Types.optional(MinorType.UINT8);
+ case INT_8:
+ return Types.optional(MinorType.TINYINT);
+ case INT_16:
+ return Types.optional(MinorType.SMALLINT);
+ }
+ }
+
+ PrimitiveTypeName type = columnChunkMetaData.getType();
+ switch (type) {
+ case BOOLEAN:
+ return Types.optional(MinorType.BIT);
+ case INT32:
+ return Types.optional(MinorType.INT);
+ case INT64:
+ return Types.optional(MinorType.BIGINT);
+ case FLOAT:
+ return Types.optional(MinorType.FLOAT4);
+ case DOUBLE:
+ return Types.optional(MinorType.FLOAT8);
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return Types.optional(MinorType.VARBINARY);
+ default:
+ // Should never hit this
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
+
+ private boolean hasSingleValue(ColumnChunkMetaData columnChunkMetaData) {
+ Statistics stats = columnChunkMetaData.getStatistics();
+ boolean hasStats = stats != null && !stats.isEmpty();
+ if (hasStats) {
+ if (stats.genericGetMin() == null || stats.genericGetMax() == null) {
+ return false;
+ }
+ return stats.genericGetMax().equals(stats.genericGetMin());
+ } else {
+ return false;
+ }
+ }
+
@Override
public void modifyFileSelection(FileSelection selection) {
entries.clear();
@@ -289,6 +476,113 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
+ public MajorType getTypeForColumn(SchemaPath schemaPath) {
+ return columnTypeMap.get(schemaPath);
+ }
+
+ private Map<String,Map<SchemaPath,Object>> partitionValueMap = Maps.newHashMap();
+
+ public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
+ String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
+ MinorType type = getTypeForColumn(column).getMinorType();
+ switch (type) {
+ case INT: {
+ NullableIntVector intVector = (NullableIntVector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ intVector.getMutator().setSafe(index, value);
+ return;
+ }
+ case SMALLINT: {
+ NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ smallIntVector.getMutator().setSafe(index, value.shortValue());
+ return;
+ }
+ case TINYINT: {
+ NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ tinyIntVector.getMutator().setSafe(index, value.byteValue());
+ return;
+ }
+ case UINT1: {
+ NullableUInt1Vector intVector = (NullableUInt1Vector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ intVector.getMutator().setSafe(index, value.byteValue());
+ return;
+ }
+ case UINT2: {
+ NullableUInt2Vector intVector = (NullableUInt2Vector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ intVector.getMutator().setSafe(index, (char) value.shortValue());
+ return;
+ }
+ case UINT4: {
+ NullableUInt4Vector intVector = (NullableUInt4Vector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ intVector.getMutator().setSafe(index, value);
+ return;
+ }
+ case BIGINT: {
+ NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
+ Long value = (Long) partitionValueMap.get(f).get(column);
+ bigIntVector.getMutator().setSafe(index, value);
+ return;
+ }
+ case FLOAT4: {
+ NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
+ Float value = (Float) partitionValueMap.get(f).get(column);
+ float4Vector.getMutator().setSafe(index, value);
+ return;
+ }
+ case FLOAT8: {
+ NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
+ Double value = (Double) partitionValueMap.get(f).get(column);
+ float8Vector.getMutator().setSafe(index, value);
+ return;
+ }
+ case VARBINARY: {
+ NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
+ Binary value = (Binary) partitionValueMap.get(f).get(column);
+ byte[] bytes = value.getBytes();
+ varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+ return;
+ }
+ case DECIMAL18: {
+ NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
+ Long value = (Long) partitionValueMap.get(f).get(column);
+ decimalVector.getMutator().setSafe(index, value);
+ return;
+ }
+ case DATE: {
+ NullableDateVector dateVector = (NullableDateVector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ return;
+ }
+ case TIME: {
+ NullableTimeVector timeVector = (NullableTimeVector) v;
+ Integer value = (Integer) partitionValueMap.get(f).get(column);
+ timeVector.getMutator().set(index, value);
+ return;
+ }
+ case TIMESTAMP: {
+ NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
+ Long value = (Long) partitionValueMap.get(f).get(column);
+ timeStampVector.getMutator().set(index, value);
+ return;
+ }
+ case VARCHAR: {
+ NullableVarCharVector varCharVector = (NullableVarCharVector) v;
+ Binary value = (Binary) partitionValueMap.get(f).get(column);
+ byte[] bytes = value.getBytes();
+ varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+ return;
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
private EndpointByteMap byteMap;
@@ -476,4 +770,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0;
}
+ @Override
+ public List<SchemaPath> getPartitionColumns() {
+ return new ArrayList(columnTypeMap.keySet());
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 621f05c..12b15a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,17 +25,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -94,11 +102,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private String prefix;
private int index = 0;
private OperatorContext oContext;
+ private List<String> partitionColumns;
+ private boolean hasPartitions;
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super();
this.oContext = context.newOperatorContext(writer, true);
this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator());
+ this.partitionColumns = writer.getPartitionColumns();
+ this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
}
@Override
@@ -132,19 +144,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
@Override
- public void updateSchema(BatchSchema batchSchema) throws IOException {
- if (this.batchSchema == null || !this.batchSchema.equals(batchSchema)) {
+ public void updateSchema(VectorAccessible batch) throws IOException {
+ if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema())) {
if (this.batchSchema != null) {
flush();
}
- this.batchSchema = batchSchema;
+ this.batchSchema = batch.getSchema();
newSchema();
}
+ TypedFieldId fieldId = batch.getValueVectorId(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD));
+ if (fieldId != null) {
+ VectorWrapper w = batch.getValueAccessorById(BitVector.class, fieldId.getFieldIds());
+ setPartitionVector((BitVector) w.getValueVector());
+ }
}
private void newSchema() throws IOException {
List<Type> types = Lists.newArrayList();
for (MaterializedField field : batchSchema) {
+ if (field.getPath().equals(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD))) {
+ continue;
+ }
types.add(getType(field));
}
schema = new MessageType("root", types);
@@ -189,6 +209,22 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
}
+ @Override
+ public void checkForNewPartition(int index) {
+ if (!hasPartitions) {
+ return;
+ }
+ try {
+ boolean newPartition = newPartition(index);
+ if (newPartition) {
+ flush();
+ newSchema();
+ }
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
private void flush() throws IOException {
if (recordCount > 0) {
parquetFileWriter.startBlock(recordCount);
http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 31b1fbe..8a74b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StringOutputRecordWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;