You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/08/17 20:14:14 UTC
drill git commit: DRILL-4846: Fix a few performance issues for
metadata access:
Repository: drill
Updated Branches:
refs/heads/master 0a4c21cc1 -> 57dc9f43b
DRILL-4846: Fix a few performance issues for metadata access:
- Create a MetadataContext that can be shared among multiple invocations of the Metadata APIs.
- Check directory modification time only if not previously checked.
- Remove a redundant call for metadata read.
- Added more logging.
- Consolidate couple of metadata methods.
close apache/drill#569
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57dc9f43
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57dc9f43
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57dc9f43
Branch: refs/heads/master
Commit: 57dc9f43b6c578c1f90fb7f9e2495b184ab17b21
Parents: 0a4c21c
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Aug 3 09:00:51 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Aug 17 12:04:00 2016 -0700
----------------------------------------------------------------------
.../planner/AbstractPartitionDescriptor.java | 3 +-
.../planner/FileSystemPartitionDescriptor.java | 11 +-
.../planner/ParquetPartitionDescriptor.java | 10 +-
.../drill/exec/planner/PartitionDescriptor.java | 4 +-
.../logical/partition/PruneScanRule.java | 24 +++-
.../drill/exec/store/dfs/FileSelection.java | 21 ++-
.../drill/exec/store/dfs/MetadataContext.java | 60 +++++++++
.../drill/exec/store/parquet/Metadata.java | 127 +++++++++++--------
.../exec/store/parquet/ParquetFormatPlugin.java | 8 +-
.../exec/store/parquet/ParquetGroupScan.java | 18 +--
10 files changed, 202 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index d8872dc..ed62c91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.drill.exec.store.dfs.MetadataContext;
/**
* Abstract base class for file system based partition descriptors and Hive partition descriptors.
@@ -65,7 +66,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
- boolean isAllPruned) throws Exception {
+ boolean isAllPruned, MetadataContext metaContext) throws Exception {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 7796212..a04f34d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -211,7 +212,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
- boolean wasAllPartitionsPruned) throws Exception {
+ boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
if (!location.isCompositePartition()) {
@@ -228,6 +229,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
final FormatSelection formatSelection = (FormatSelection)table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+ newFileSelection.setMetaContext(metaContext);
final FileGroupScan newGroupScan =
((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
@@ -239,18 +241,19 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
- wasAllPartitionsPruned);
+ wasAllPartitionsPruned, metaContext);
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}
private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
- boolean wasAllPartitionsPruned) {
+ boolean wasAllPartitionsPruned, MetadataContext metaContext) {
final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
final FormatSelection formatSelection = (FormatSelection) table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+ newFileSelection.setMetaContext(metaContext);
final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
final DrillTranslatableTable newTable = new DrillTranslatableTable(
new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
@@ -265,7 +268,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
- return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+ return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index 2c8ca95..534eb5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.ValueVector;
@@ -81,9 +82,10 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
}
private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot,
- boolean wasAllPartitionsPruned) throws IOException {
+ boolean wasAllPartitionsPruned, MetadataContext metaContext) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned);
+ newSelection.setMetaContext(metaContext);
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan;
}
@@ -134,13 +136,13 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
- boolean wasAllPartitionsPruned) throws Exception {
+ boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
newFiles.add(location.getEntirePartitionLocation());
}
- final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned);
+ final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned, metaContext);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
@@ -154,7 +156,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
- return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+ return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 83cb146..daee249 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.ValueVector;
import java.util.BitSet;
@@ -90,11 +91,12 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
* @param newPartitions
* @param cacheFileRoot
* @param wasAllPartitionsPruned
+ * @param metaContext
* @return
* @throws Exception
*/
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
- boolean wasAllPartitionsPruned) throws Exception;
+ boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception;
public boolean supportsMetadataCachePruning();
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index d3e38fa..011c783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -60,6 +60,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -406,8 +407,13 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
+ final Object selection = getDrillTable(scanRel).getSelection();
+ MetadataContext metaContext = null;
+ if (selection instanceof FormatSelection) {
+ metaContext = ((FormatSelection)selection).getSelection().getMetaContext();
+ }
RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
- descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
+ descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned, metaContext) :
descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
if (projectRel != null) {
@@ -488,14 +494,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);
+ private static DrillTable getDrillTable(final TableScan scan) {
+ DrillTable drillTable;
+ drillTable = scan.getTable().unwrap(DrillTable.class);
+ if (drillTable == null) {
+ drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+ return drillTable;
+ }
+
private static boolean isQualifiedDirPruning(final TableScan scan) {
if (scan instanceof EnumerableTableScan) {
- DrillTable drillTable;
- drillTable = scan.getTable().unwrap(DrillTable.class);
- if (drillTable == null) {
- drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
- }
- final Object selection = drillTable.getSelection();
+ final Object selection = getDrillTable(scan).getSelection();
if (selection instanceof FormatSelection
&& ((FormatSelection)selection).supportDirPruning()) {
return true; // Do directory-based pruning in Calcite logical
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 8904c82..3ab6277 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
+import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.hadoop.fs.FileStatus;
@@ -56,6 +57,11 @@ public class FileSelection {
*/
public final String cacheFileRoot;
+ /**
+ * metadata context useful for metadata operations (if any)
+ */
+ private MetadataContext metaContext = null;
+
private enum StatusType {
NOT_CHECKED, // initial state
NO_DIRS, // no directories in this selection
@@ -106,6 +112,7 @@ public class FileSelection {
this.selectionRoot = selection.selectionRoot;
this.dirStatus = selection.dirStatus;
this.cacheFileRoot = selection.cacheFileRoot;
+ this.metaContext = selection.metaContext;
this.hadWildcard = selection.hadWildcard;
this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
}
@@ -124,7 +131,7 @@ public class FileSelection {
}
statuses = newStatuses;
}
- logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}",
+ logger.info("FileSelection.getStatuses() took {} ms, numFiles: {}",
timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size());
return statuses;
@@ -328,6 +335,7 @@ public class FileSelection {
public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
final String cacheFileRoot) {
+ Stopwatch timer = Stopwatch.createStarted();
final String root = selection.getSelectionRoot();
if (Strings.isNullOrEmpty(root)) {
throw new DrillRuntimeException("Selection root is null or empty" + root);
@@ -354,6 +362,7 @@ public class FileSelection {
final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
fileSel.setHadWildcard(selection.hadWildcard());
+ logger.info("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
return fileSel;
}
@@ -402,6 +411,14 @@ public class FileSelection {
return cacheFileRoot;
}
+ public void setMetaContext(MetadataContext context) {
+ metaContext = context;
+ }
+
+ public MetadataContext getMetaContext() {
+ return metaContext;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
new file mode 100644
index 0000000..17852ab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A metadata context that holds state across multiple invocations of
+ * the Parquet metadata APIs.
+ */
+public class MetadataContext {
+
+ /** Map of directory path to the status of whether modification time was already checked.
+ * Note: the #directories is typically a small percentage of the #files, so the memory footprint
+ * is expected to be relatively small.
+ */
+ private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();
+
+ public MetadataContext() {
+ }
+
+ public void setStatus(String dir) {
+ dirModifCheckMap.put(dir, true);
+ }
+
+ public void clearStatus(String dir) {
+ dirModifCheckMap.put(dir, false);
+ }
+
+ public boolean getStatus(String dir) {
+ if (dirModifCheckMap.containsKey(dir)) {
+ return dirModifCheckMap.get(dir);
+ }
+ return false;
+ }
+
+ public void clear() {
+ dirModifCheckMap.clear();
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 45f7ca2..86b860a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -81,6 +81,9 @@ public class Metadata {
private final FileSystem fs;
+ private ParquetTableMetadataBase parquetTableMetadata;
+ private ParquetTableMetadataDirs parquetTableMetadataDirs;
+
/**
* Create the parquet metadata file for the directory at the given path, and for any subdirectories
*
@@ -129,14 +132,16 @@ public class Metadata {
* @return
* @throws IOException
*/
- public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path) throws IOException {
+ public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext) throws IOException {
Metadata metadata = new Metadata(fs);
- return metadata.readBlockMeta(path);
+ metadata.readBlockMeta(path, false, metaContext);
+ return metadata.parquetTableMetadata;
}
- public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path) throws IOException {
+ public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext) throws IOException {
Metadata metadata = new Metadata(fs);
- return metadata.readMetadataDirs(path);
+ metadata.readBlockMeta(path, true, metaContext);
+ return metadata.parquetTableMetadataDirs;
}
private Metadata(FileSystem fs) {
@@ -151,6 +156,7 @@ public class Metadata {
*/
private Pair<ParquetTableMetadata_v2, ParquetTableMetadataDirs>
createMetaFilesRecursively(final String path) throws IOException {
+ Stopwatch timer = Stopwatch.createStarted();
List<ParquetFileMetadata_v2> metaDataList = Lists.newArrayList();
List<String> directoryList = Lists.newArrayList();
ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfoSet =
@@ -199,9 +205,13 @@ public class Metadata {
if (directoryList.size() > 0 && childFiles.size() == 0) {
ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
+ logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
+ timer.stop();
return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
}
List<String> emptyDirList = Lists.newArrayList();
+ logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
+ timer.stop();
return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(emptyDirList));
}
@@ -453,9 +463,12 @@ public class Metadata {
* @return
* @throws IOException
*/
- private ParquetTableMetadataBase readBlockMeta(String path) throws IOException {
+ private void readBlockMeta(String path,
+ boolean dirsOnly,
+ MetadataContext metaContext) throws IOException {
Stopwatch timer = Stopwatch.createStarted();
Path p = new Path(path);
+ Path parentDir = p.getParent(); // parent directory of the metadata file
ObjectMapper mapper = new ObjectMapper();
final SimpleModule serialModule = new SimpleModule();
@@ -470,41 +483,38 @@ public class Metadata {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
FSDataInputStream is = fs.open(p);
- ParquetTableMetadataBase parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
- logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
- timer.stop();
- if (tableModified(parquetTableMetadata, p)) {
- parquetTableMetadata =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
- }
- return parquetTableMetadata;
- }
-
- private ParquetTableMetadataDirs readMetadataDirs(String path) throws IOException {
- Stopwatch timer = Stopwatch.createStarted();
- Path p = new Path(path);
- ObjectMapper mapper = new ObjectMapper();
-
- final SimpleModule serialModule = new SimpleModule();
- serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
-
- AfterburnerModule module = new AfterburnerModule();
- module.setUseOptimizedBeanDeserializer(true);
+ boolean alreadyCheckedModification = false;
+ boolean newMetadata = false;
- mapper.registerModule(serialModule);
- mapper.registerModule(module);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- FSDataInputStream is = fs.open(p);
+ if (metaContext != null) {
+ alreadyCheckedModification = metaContext.getStatus(parentDir.toString());
+ }
- ParquetTableMetadataDirs parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
- logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
- timer.stop();
+ if (dirsOnly) {
+ parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
+ logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+ timer.stop();
+ if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
+ parquetTableMetadataDirs =
+ (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+ newMetadata = true;
+ }
+ } else {
+ parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
+ logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+ timer.stop();
+ if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
+ parquetTableMetadata =
+ (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
+ newMetadata = true;
+ }
+ }
- if (tableModified(parquetTableMetadataDirs, p)) {
- parquetTableMetadataDirs =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+ if (newMetadata && metaContext != null) {
+ // if new metadata files were created, invalidate the existing metadata context
+ metaContext.clear();
}
- return parquetTableMetadataDirs;
+
}
/**
@@ -516,35 +526,42 @@ public class Metadata {
* @return
* @throws IOException
*/
- private boolean tableModified(ParquetTableMetadataBase tableMetadata, Path metaFilePath)
+ private boolean tableModified(List<String> directories, Path metaFilePath,
+ Path parentDir,
+ MetadataContext metaContext)
throws IOException {
- long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
- FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
- if (directoryStatus.getModificationTime() > metaFileModifyTime) {
- return true;
- }
- for (String directory : tableMetadata.getDirectories()) {
- directoryStatus = fs.getFileStatus(new Path(directory));
- if (directoryStatus.getModificationTime() > metaFileModifyTime) {
- return true;
- }
- }
- return false;
- }
- private boolean tableModified(ParquetTableMetadataDirs tableMetadataDirs, Path metaFilePath)
- throws IOException {
+ Stopwatch timer = Stopwatch.createStarted();
+
+ if (metaContext != null) {
+ metaContext.setStatus(parentDir.toString());
+ }
long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
- FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
+ FileStatus directoryStatus = fs.getFileStatus(parentDir);
+ int numDirs = 1;
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+ logger.info("Directory {} was modified. Took {} ms to check modification time of {} directories", directoryStatus.getPath().toString(),
+ timer.elapsed(TimeUnit.MILLISECONDS),
+ numDirs);
+ timer.stop();
return true;
}
- for (String directory : tableMetadataDirs.getDirectories()) {
+ for (String directory : directories) {
+ numDirs++;
+ if (metaContext != null) {
+ metaContext.setStatus(directory);
+ }
directoryStatus = fs.getFileStatus(new Path(directory));
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+ logger.info("Directory {} was modified. Took {} ms to check modification time of {} directories", directoryStatus.getPath().toString(),
+ timer.elapsed(TimeUnit.MILLISECONDS),
+ numDirs);
+ timer.stop();
return true;
}
}
+ logger.info("No directories were modified. Took {} ms to check modification time of {} directories", timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+ timer.stop();
return false;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 5174893..1ab621b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataDirs;
import org.apache.hadoop.conf.Configuration;
@@ -214,11 +215,16 @@ public class ParquetFormatPlugin implements FormatPlugin{
// the directory is readable since the metadata 'directories' file cannot be created otherwise. Note
// that isDirReadable() does a similar check with the metadata 'cache' file.
if (fs.exists(dirMetaPath)) {
- ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString());
+ // create a metadata context that will be used for the duration of the query for this table
+ MetadataContext metaContext = new MetadataContext();
+
+ ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext);
if (mDirs.getDirectories().size() > 0) {
FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
dirSelection.setExpandedPartial();
+ dirSelection.setMetaContext(metaContext);
+
return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
new FormatSelection(plugin.getConfig(), dirSelection));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index f666439..21227e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -149,7 +150,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.selectionRoot = selectionRoot;
this.cacheFileRoot = cacheFileRoot;
- init();
+ init(null);
}
public ParquetGroupScan( //
@@ -176,7 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
entries.add(new ReadEntryWithPath(fileName));
}
- init();
+ init(fileSelection.getMetaContext());
}
/*
@@ -570,7 +571,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
// we only select the files that are part of selection (by setting fileSet appropriately)
// get (and set internal field) the metadata for the directory by reading the metadata file
- this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
+ this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString(), selection.getMetaContext());
List<FileStatus> fileStatuses = selection.getStatuses(fs);
if (fileSet == null) {
@@ -586,7 +587,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
} else if (selection.isExpandedPartial() && !selection.hadWildcard() &&
cacheFileRoot != null) {
- this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
if (selection.wasAllPartitionsPruned()) {
// if all partitions were previously pruned, we only need to read 1 file (for the schema)
fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
@@ -605,7 +605,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
if (status.isDirectory()) {
//TODO [DRILL-4496] read the metadata cache files in parallel
final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME);
- final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString());
+ final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext());
for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
fileSet.add(file.getPath());
}
@@ -641,7 +641,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return newSelection;
}
- private void init() throws IOException {
+ private void init(MetadataContext metaContext) throws IOException {
if (entries.size() == 1 && parquetTableMetadata == null) {
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
Path metaPath = null;
@@ -652,7 +652,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
if (metaPath != null && fs.exists(metaPath)) {
usedMetadataCache = true;
- parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+ parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext);
} else {
parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
}
@@ -662,7 +662,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
usedMetadataCache = true;
if (parquetTableMetadata == null) {
- parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+ parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext);
}
if (fileSet != null) {
parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
@@ -883,7 +883,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
ParquetGroupScan newScan = new ParquetGroupScan(this);
newScan.modifyFileSelection(selection);
newScan.cacheFileRoot = selection.cacheFileRoot;
- newScan.init();
+ newScan.init(selection.getMetaContext());
return newScan;
}