You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/20 02:23:37 UTC

[1/2] drill git commit: DRILL-2743: Parquet file metadata caching

Repository: drill
Updated Branches:
  refs/heads/master 3f5ebafca -> 1716cb083


DRILL-2743: Parquet file metadata caching

rebasing on top of master required conflict resolution in Parser.tdd and parserImpls.ftl

this closes #114


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ee60958
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ee60958
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ee60958

Branch: refs/heads/master
Commit: 0ee609581423b9649391be02013da86fe9b0e2f2
Parents: 3f5ebaf
Author: Steven Phillips <sm...@apache.org>
Authored: Tue Aug 18 13:24:33 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Fri Sep 18 16:19:13 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |   2 +-
 .../drill/common/expression/SchemaPath.java     |   4 +-
 exec/java-exec/src/main/codegen/data/Parser.tdd |   7 +-
 .../src/main/codegen/includes/parserImpls.ftl   |  21 +
 .../sql/handlers/RefreshMetadataHandler.java    | 128 +++++
 .../planner/sql/parser/SqlRefreshMetadata.java  |  97 ++++
 .../drill/exec/store/dfs/DrillPathFilter.java   |   3 +
 .../drill/exec/store/parquet/Metadata.java      | 498 +++++++++++++++++++
 .../exec/store/parquet/ParquetFormatPlugin.java |   3 +
 .../exec/store/parquet/ParquetGroupScan.java    | 367 +++++++-------
 .../exec/store/schedule/AffinityCreator.java    |   1 -
 .../store/parquet/TestParquetMetadataCache.java |  62 +++
 12 files changed, 986 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 46fc7f9..f6d10ea 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -72,7 +72,7 @@ public final class DrillConfig extends NestedConfig{
     if (enableServerConfigs) {
       SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
         .addDeserializer(LogicalExpression.class, new LogicalExpression.De(this))
-        .addDeserializer(SchemaPath.class, new SchemaPath.De(this));
+        .addDeserializer(SchemaPath.class, new SchemaPath.De());
 
       mapper.registerModule(deserModule);
       mapper.enable(SerializationFeature.INDENT_OUTPUT);

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 9718071..f5fc687 100644
--- a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -252,11 +252,9 @@ public class SchemaPath extends LogicalExpressionBase {
   }
 
   public static class De extends StdDeserializer<SchemaPath> {
-    DrillConfig config;
 
-    public De(DrillConfig config) {
+    public De() {
       super(LogicalExpression.class);
-      this.config = config;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 6ff7fa4..3e5f336 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -34,7 +34,9 @@
     "SHOW",
     "TABLES",
     "USE",
-    "FILES"
+    "FILES",
+    "REFRESH",
+    "METADATA"
   ]
 
   # List of methods for parsing custom SQL statements.
@@ -47,7 +49,8 @@
     "SqlDropView()",
     "SqlShowFiles()",
     "SqlCreateTable()",
-    "SqlDropTable()"
+    "SqlDropTable()",
+    "SqlRefreshMetadata()"
   ]
 
   # List of methods for parsing custom literals.

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index c761d47..9dce04a 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -257,3 +257,24 @@ SqlNode SqlDropTable() :
     }
 }
 
+/**
+ * Parse refresh table metadata statement.
+ * REFRESH TABLE METADATA tblname
+ */
+SqlNode SqlRefreshMetadata() :
+{
+    SqlParserPos pos;
+    SqlIdentifier tblName;
+    SqlNodeList fieldList;
+    SqlNode query;
+}
+{
+    <REFRESH> { pos = getPos(); }
+    <TABLE>
+    <METADATA>
+    tblName = CompoundIdentifier()
+    {
+        return new SqlRefreshMetadata(pos, tblName);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
new file mode 100644
index 0000000..ce4059b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+import java.util.List;
+
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillStoreRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillWriterRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
+
+public class RefreshMetadataHandler extends DefaultSqlHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class);
+
+  public RefreshMetadataHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
+  private PhysicalPlan direct(boolean outcome, String message, Object... values){
+    return DirectPlan.createDirectPlan(context, outcome, String.format(message, values));
+  }
+
+  private PhysicalPlan notSupported(String tbl){
+    return direct(false, "Table %s does not support metadata refresh.  Support is currently limited to single-directory-based Parquet tables.", tbl);
+  }
+
+  @Override
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
+    final SqlRefreshMetadata refreshTable = unwrap(sqlNode, SqlRefreshMetadata.class);
+
+    try {
+
+      final SchemaPlus schema = findSchema(context.getNewDefaultSchema(),
+          refreshTable.getSchemaPath());
+
+      final String tableName = refreshTable.getName();
+
+      if (tableName.contains("*") || tableName.contains("?")) {
+        return direct(false, "Glob path %s not supported for metadata refresh", tableName);
+      }
+
+      final Table table = schema.getTable(tableName);
+
+      if(table == null){
+        return direct(false, "Table %s does not exist.", tableName);
+      }
+
+      if(! (table instanceof DrillTable) ){
+        return notSupported(tableName);
+      }
+
+
+      final DrillTable drillTable = (DrillTable) table;
+
+      final Object selection = drillTable.getSelection();
+      if( !(selection instanceof FormatSelection) ){
+        return notSupported(tableName);
+      }
+
+      FormatSelection formatSelection = (FormatSelection) selection;
+
+      FormatPluginConfig formatConfig = formatSelection.getFormat();
+      if (!((formatConfig instanceof ParquetFormatConfig) ||
+          ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+        return notSupported(tableName);
+      }
+
+      FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
+      DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+
+      String selectionRoot = formatSelection.getSelection().selectionRoot;
+      if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+        return notSupported(tableName);
+      }
+
+      Metadata.createMeta(fs, selectionRoot);
+      return direct(true, "Successfully updated metadata for table %s.", tableName);
+
+    } catch(Exception e) {
+      logger.error("Failed to update metadata for table '{}'", refreshTable.getName(), e);
+      return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage()));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
new file mode 100644
index 0000000..01050b8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import java.util.List;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Sql parse tree node to represent statement:
+ * REFRESH TABLE METADATA tblname
+ */
+public class SqlRefreshMetadata extends DrillSqlCall {
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REFRESH_TABLE_METADATA", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+      return new SqlRefreshMetadata(pos, (SqlIdentifier) operands[0]);
+    }
+  };
+
+  private SqlIdentifier tblName;
+
+  public SqlRefreshMetadata(SqlParserPos pos, SqlIdentifier tblName){
+    super(pos);
+    this.tblName = tblName;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    List<SqlNode> ops = Lists.newArrayList();
+    ops.add(tblName);
+    return ops;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("REFRESH");
+    writer.keyword("TABLE");
+    writer.keyword("METADATA");
+    tblName.unparse(writer, leftPrec, rightPrec);
+  }
+
+  public String getName() {
+    if (tblName.isSimple()) {
+      return tblName.getSimple();
+    }
+
+    return tblName.names.get(tblName.names.size() - 1);
+  }
+
+  public List<String> getSchemaPath() {
+    if (tblName.isSimple()) {
+      return ImmutableList.of();
+    }
+
+    return tblName.names.subList(0, tblName.names.size() - 1);
+  }
+
+  @Override
+  public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+    return new RefreshMetadataHandler(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
index 00f463d..5c2d71a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
@@ -29,6 +29,9 @@ public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
     if (path.getName().startsWith(DrillFileSystem.DOT_FILE_PREFIX)) {
       return false;
     }
+    if (path.getName().startsWith(".")) {
+      return false;
+    }
     return super.accept(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
new file mode 100644
index 0000000..02414a4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator.Feature;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.SchemaPath.De;
+import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.column.statistics.Statistics;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class Metadata {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
+
+  public static final String METADATA_FILENAME = ".drill.parquet_metadata";
+
+  private final FileSystem fs;
+
+  /**
+   * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+   * @param fs
+   * @param path
+   * @throws IOException
+   */
+  public static void createMeta(FileSystem fs, String path) throws IOException {
+    Metadata metadata = new Metadata(fs);
+    metadata.createMetaFilesRecursively(path);
+  }
+
+  /**
+   * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories
+   * @param fs
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, String path) throws IOException {
+    Metadata metadata = new Metadata(fs);
+    return metadata.getParquetTableMetadata(path);
+  }
+
+  /**
+   * Get the parquet metadata for a list of parquet files
+   * @param fs
+   * @param fileStatuses
+   * @return
+   * @throws IOException
+   */
+  public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs,
+                                                             List<FileStatus> fileStatuses) throws IOException {
+    Metadata metadata = new Metadata(fs);
+    return metadata.getParquetTableMetadata(fileStatuses);
+  }
+
+  /**
+   * Get the parquet metadata for a directory by reading the metadata file
+   * @param fs
+   * @param path The path to the metadata file, located in the directory that contains the parquet files
+   * @return
+   * @throws IOException
+   */
+  public static ParquetTableMetadata_v1 readBlockMeta(FileSystem fs, String path) throws IOException {
+    Metadata metadata = new Metadata(fs);
+    return metadata.readBlockMeta(path);
+  }
+
+  private Metadata(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  /**
+   * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+   * @param path
+   * @throws IOException
+   */
+  private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException {
+    List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+    List<String> directoryList = Lists.newArrayList();
+    Path p = new Path(path);
+    FileStatus fileStatus = fs.getFileStatus(p);
+    assert fileStatus.isDirectory() : "Expected directory";
+
+    final List<FileStatus> childFiles = Lists.newArrayList();
+
+    for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
+      if (file.isDirectory()) {
+        ParquetTableMetadata_v1 subTableMetadata = createMetaFilesRecursively(file.getPath().toString());
+        metaDataList.addAll(subTableMetadata.files);
+        directoryList.addAll(subTableMetadata.directories);
+        directoryList.add(file.getPath().toString());
+      } else {
+        childFiles.add(file);
+      }
+    }
+    if (childFiles.size() > 0) {
+      metaDataList.addAll(getParquetFileMetadata(childFiles));
+    }
+    ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(metaDataList, directoryList);
+    writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
+    return parquetTableMetadata;
+  }
+
+  /**
+   * Get the parquet metadata for the parquet files in a directory
+   * @param path the path of the directory
+   * @return
+   * @throws IOException
+   */
+  private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOException {
+    Path p = new Path(path);
+    FileStatus fileStatus = fs.getFileStatus(p);
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    List<FileStatus> fileStatuses = getFileStatuses(fileStatus);
+    logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
+    return getParquetTableMetadata(fileStatuses);
+  }
+
+  /**
+   * Get the parquet metadata for a list of parquet files
+   * @param fileStatuses
+   * @return
+   * @throws IOException
+   */
+  private ParquetTableMetadata_v1 getParquetTableMetadata(List<FileStatus> fileStatuses) throws IOException {
+    List<ParquetFileMetadata> fileMetadataList = getParquetFileMetadata(fileStatuses);
+    return new ParquetTableMetadata_v1(fileMetadataList, new ArrayList<String>());
+  }
+
+  /**
+   * Get a list of file metadata for a list of parquet files
+   * @param fileStatuses
+   * @return
+   * @throws IOException
+   */
+  private List<ParquetFileMetadata> getParquetFileMetadata(List<FileStatus> fileStatuses) throws IOException {
+    List<TimedRunnable<ParquetFileMetadata>> gatherers = Lists.newArrayList();
+    for (FileStatus file : fileStatuses) {
+      gatherers.add(new MetadataGatherer(file));
+    }
+
+    List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+    metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16));
+    return metaDataList;
+  }
+
+  /**
+   * Recursively get a list of files
+   * @param fileStatus
+   * @return
+   * @throws IOException
+   */
+  private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException {
+    List<FileStatus> statuses = Lists.newArrayList();
+    if (fileStatus.isDirectory()) {
+      for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) {
+        statuses.addAll(getFileStatuses(child));
+      }
+    } else {
+      statuses.add(fileStatus);
+    }
+    return statuses;
+  }
+
+  /**
+   * TimedRunnable that reads the footer from parquet and collects file metadata
+   */
+  private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata> {
+
+    private FileStatus fileStatus;
+
+    public MetadataGatherer(FileStatus fileStatus) {
+      this.fileStatus = fileStatus;
+    }
+
+    @Override
+    protected ParquetFileMetadata runInner() throws Exception {
+      return getParquetFileMetadata(fileStatus);
+    }
+
+    @Override
+    protected IOException convertToIOException(Exception e) {
+      if (e instanceof IOException) {
+        return (IOException) e;
+      } else {
+        return new IOException(e);
+      }
+    }
+  }
+
+  private OriginalType getOriginalType(Type type, String[] path, int depth) {
+    if (type.isPrimitive()) {
+      return type.getOriginalType();
+    }
+    Type t = ((GroupType) type).getType(path[path.length - depth - 1]);
+    return getOriginalType(t, path, depth + 1);
+  }
+
+  /**
+   * Get the metadata for a single file
+   * @param file
+   * @return
+   * @throws IOException
+   */
+  private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOException {
+    ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file);
+    MessageType schema = metadata.getFileMetaData().getSchema();
+
+    Map<SchemaPath,OriginalType> originalTypeMap = Maps.newHashMap();
+    schema.getPaths();
+    for (String[] path : schema.getPaths()) {
+      originalTypeMap.put(SchemaPath.getCompoundPath(path), getOriginalType(schema, path, 0));
+    }
+
+    List<RowGroupMetadata> rowGroupMetadataList = Lists.newArrayList();
+
+    for (BlockMetaData rowGroup : metadata.getBlocks()) {
+      List<ColumnMetadata> columnMetadataList = Lists.newArrayList();
+      long length = 0;
+      for (ColumnChunkMetaData col : rowGroup.getColumns()) {
+        ColumnMetadata columnMetadata;
+
+        boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty());
+
+        Statistics stats = col.getStatistics();
+        SchemaPath columnName = SchemaPath.getCompoundPath(col.getPath().toArray());
+        if (statsAvailable) {
+          columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
+              stats.genericGetMax(), stats.genericGetMin(), stats.getNumNulls());
+        } else {
+          columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
+              null, null, null);
+        }
+        columnMetadataList.add(columnMetadata);
+        length += col.getTotalSize();
+      }
+
+      RowGroupMetadata rowGroupMeta = new RowGroupMetadata(rowGroup.getStartingPos(), length, rowGroup.getRowCount(),
+              getHostAffinity(file, rowGroup.getStartingPos(), length), columnMetadataList);
+
+      rowGroupMetadataList.add(rowGroupMeta);
+    }
+    String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
+
+    return new ParquetFileMetadata(path, file.getLen(), rowGroupMetadataList);
+  }
+
+  /**
+   * Get the host affinity for a row group
+   * @param fileStatus the parquet file
+   * @param start the start of the row group
+   * @param length the length of the row group
+   * @return
+   * @throws IOException
+   */
+  private Map<String,Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException {
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
+    Map<String,Float> hostAffinityMap = Maps.newHashMap();
+    for (BlockLocation blockLocation : blockLocations) {
+      for (String host : blockLocation.getHosts()) {
+        Float currentAffinity = hostAffinityMap.get(host);
+        float blockStart = blockLocation.getOffset();
+        float blockEnd = blockStart + blockLocation.getLength();
+        float rowGroupEnd = start + length;
+        Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) -
+                (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
+        if (currentAffinity != null) {
+          hostAffinityMap.put(host, currentAffinity + newAffinity);
+        } else {
+          hostAffinityMap.put(host, newAffinity);
+        }
+      }
+    }
+    return hostAffinityMap;
+  }
+
+  /**
+   * Serialize parquet metadata to json and write to a file
+   * @param parquetTableMetadata
+   * @param p
+   * @throws IOException
+   */
+  private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) throws IOException {
+    JsonFactory jsonFactory = new JsonFactory();
+    jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
+    jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+    ObjectMapper mapper = new ObjectMapper(jsonFactory);
+    FSDataOutputStream os = fs.create(p);
+    mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
+    os.flush();
+    os.close();
+  }
+
+  /**
+   * Read the parquet metadata from a file
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException {
+    Path p = new Path(path);
+    ObjectMapper mapper = new ObjectMapper();
+    SimpleModule module = new SimpleModule();
+    module.addDeserializer(SchemaPath.class, new De());
+    mapper.registerModule(module);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    FSDataInputStream is = fs.open(p);
+    ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class);
+    if (tableModified(parquetTableMetadata, p)) {
+      parquetTableMetadata = createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString());
+    }
+    return parquetTableMetadata;
+  }
+
+  /**
+   * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
+   * the modification time of the metadata file
+   * @param tableMetadata
+   * @param metaFilePath
+   * @return
+   * @throws IOException
+   */
+  private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFilePath) throws IOException {
+    long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
+    FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
+    if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+      return true;
+    }
+    for (String directory : tableMetadata.directories) {
+      directoryStatus = fs.getFileStatus(new Path(directory));
+      if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version")
+  public static class ParquetTableMetadataBase {
+
+  }
+
+  /**
+   * Struct which contains the metadata for an entire parquet directory structure
+   */
+  @JsonTypeName("v1")
+  public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase {
+    @JsonProperty
+    List<ParquetFileMetadata> files;
+    @JsonProperty
+    List<String> directories;
+
+    public ParquetTableMetadata_v1() {
+      super();
+    }
+
+    public ParquetTableMetadata_v1(List<ParquetFileMetadata> files, List<String> directories) {
+      this.files = files;
+      this.directories = directories;
+    }
+  }
+
+  /**
+   * Struct which contains the metadata for a single parquet file
+   */
+  public static class ParquetFileMetadata {
+    @JsonProperty
+    public String path;
+    @JsonProperty
+    public Long length;
+    @JsonProperty
+    public List<RowGroupMetadata> rowGroups;
+
+    public ParquetFileMetadata() {
+      super();
+    }
+
+    public ParquetFileMetadata(String path, Long length, List<RowGroupMetadata> rowGroups) {
+      this.path = path;
+      this.length = length;
+      this.rowGroups = rowGroups;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("path: %s rowGroups: %s", path, rowGroups);
+    }
+  }
+
+  /**
+   * A struct that contains the metadata for a parquet row group
+   */
+  public static class RowGroupMetadata {
+    @JsonProperty
+    public Long start;
+    @JsonProperty
+    public Long length;
+    @JsonProperty
+    public Long rowCount;
+    @JsonProperty
+    public Map<String, Float> hostAffinity;
+    @JsonProperty
+    public List<ColumnMetadata> columns;
+
+    public RowGroupMetadata() {
+      super();
+    }
+
+    public RowGroupMetadata(Long start, Long length, Long rowCount,
+                            Map<String, Float> hostAffinity, List<ColumnMetadata> columns) {
+      this.start = start;
+      this.length = length;
+      this.rowCount = rowCount;
+      this.hostAffinity = hostAffinity;
+      this.columns = columns;
+    }
+  }
+
+  /**
+   * A struct that contains the metadata for a column in a parquet file
+   */
+  public static class ColumnMetadata {
+    @JsonProperty
+    public SchemaPath name;
+    @JsonProperty
+    public PrimitiveTypeName primitiveType;
+    @JsonProperty
+    public OriginalType originalType;
+    @JsonProperty
+    public Object max;
+    @JsonProperty
+    public Object min;
+    @JsonProperty
+    public Long nulls;
+
+    public ColumnMetadata() {
+      super();
+    }
+
+    public ColumnMetadata(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType,
+                          Object max, Object min, Long nulls) {
+      this.name = name;
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+      this.max = max;
+      this.min = min;
+      this.nulls = nulls;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 446e12a..eeb522a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -217,6 +217,9 @@ public class ParquetFormatPlugin implements FormatPlugin{
           return true;
         } else {
 
+          if (fs.exists(new Path(dir.getPath(), Metadata.METADATA_FILENAME))) {
+            return true;
+          }
           PathFilter filter = new DrillPathFilter();
 
           FileStatus[] files = fs.listStatus(dir.getPath(), filter);

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 845bce9..00d36ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,30 +18,23 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.expr.holders.IntervalHolder;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
@@ -56,27 +49,28 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
+import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
 import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.Float4Vector;
-import org.apache.drill.exec.vector.Float8Vector;
-import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableDateVector;
 import org.apache.drill.exec.vector.NullableDecimal18Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableSmallIntVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
@@ -87,27 +81,14 @@ import org.apache.drill.exec.vector.NullableUInt4Vector;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.hadoop.security.UserGroupInformation;
 import org.joda.time.DateTimeUtils;
-import parquet.column.statistics.Statistics;
-import parquet.format.ConvertedType;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.io.api.Binary;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
 import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -118,7 +99,6 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import parquet.schema.OriginalType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
 
 @JsonTypeName("parquet-scan")
 public class ParquetGroupScan extends AbstractFileGroupScan {
@@ -126,6 +106,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
 
+
   private final List<ReadEntryWithPath> entries;
   private final Stopwatch watch = new Stopwatch();
   private final ParquetFormatPlugin formatPlugin;
@@ -171,7 +152,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
-    this.readFooterFromEntries();
+
+    init();
   }
 
   public ParquetGroupScan( //
@@ -195,7 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     this.selectionRoot = selectionRoot;
 
-    readFooter(files);
+    init();
   }
 
   /*
@@ -238,137 +220,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return selectionRoot;
   }
 
-  private void readFooterFromEntries()  throws IOException {
-    List<FileStatus> files = Lists.newArrayList();
-    for (ReadEntryWithPath e : entries) {
-      files.add(fs.getFileStatus(new Path(e.getPath())));
-    }
-    readFooter(files);
-  }
-
-  private void readFooter(final List<FileStatus> statuses) {
-    final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
-    try {
-      ugi.doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws Exception {
-          readFooterHelper(statuses);
-          return null;
-        }
-      });
-    } catch (InterruptedException | IOException e) {
-      final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage());
-      logger.error(errMsg, e);
-      throw new DrillRuntimeException(errMsg, e);
-    }
-  }
-
   public Set<String> getFileSet() {
     return fileSet;
   }
 
-  private Set<String> fileSet = Sets.newHashSet();
-
-  private void readFooterHelper(List<FileStatus> statuses) throws IOException {
-    watch.reset();
-    watch.start();
-    Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
-
-    columnTypeMap.clear();
-    fileSet.clear();
-    partitionValueMap.clear();
-
-    rowGroupInfos = Lists.newArrayList();
-    long start = 0, length = 0;
-    rowCount = 0;
-    columnValueCounts = new HashMap<SchemaPath, Long>();
-
-    ColumnChunkMetaData columnChunkMetaData;
-
-    List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
-    boolean first = true;
-    ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-    for (Footer footer : footers) {
-      int index = 0;
-      ParquetMetadata metadata = footer.getParquetMetadata();
-      FileMetaData fileMetaData = metadataConverter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata);
-      HashMap<String, SchemaElement> schemaElements = new HashMap<>();
-      for (SchemaElement se : fileMetaData.getSchema()) {
-        schemaElements.put(se.getName(), se);
-      }
-      for (BlockMetaData rowGroup : metadata.getBlocks()) {
-        String file = Path.getPathWithoutSchemeAndAuthority(footer.getFile()).toString();
-        fileSet.add(file);
-        long valueCountInGrp = 0;
-        // need to grab block information from HDFS
-        columnChunkMetaData = rowGroup.getColumns().iterator().next();
-        start = columnChunkMetaData.getFirstDataPageOffset();
-        // this field is not being populated correctly, but the column chunks know their sizes, just summing them for
-        // now
-        // end = start + rowGroup.getTotalByteSize();
-        length = 0;
-        for (ColumnChunkMetaData col : rowGroup.getColumns()) {
-          length += col.getTotalSize();
-          valueCountInGrp = Math.max(col.getValueCount(), valueCountInGrp);
-          SchemaPath schemaPath = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
-
-          long previousCount = 0;
-          long currentCount = 0;
-
-          if (! columnValueCounts.containsKey(schemaPath)) {
-            // create an entry for this column
-            columnValueCounts.put(schemaPath, previousCount /* initialize to 0 */);
-          } else {
-            previousCount = columnValueCounts.get(schemaPath);
-          }
-
-          boolean statsAvail = (col.getStatistics() != null && !col.getStatistics().isEmpty());
-
-          if (statsAvail && previousCount != GroupScan.NO_COLUMN_STATS) {
-            currentCount = col.getValueCount() - col.getStatistics().getNumNulls(); // only count non-nulls
-            columnValueCounts.put(schemaPath, previousCount + currentCount);
-          } else {
-            // even if 1 chunk does not have stats, we cannot rely on the value count for this column
-            columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
-          }
-
-          // check if this column can be used for partition pruning
-          SchemaElement se = schemaElements.get(schemaPath.getAsUnescapedPath());
-          boolean partitionColumn = checkForPartitionColumn(schemaPath, col, se, first);
-          if (partitionColumn) {
-            Map<SchemaPath,Object> map = partitionValueMap.get(file);
-            if (map == null) {
-              map = Maps.newHashMap();
-              partitionValueMap.put(file, map);
-            }
-            Object value = map.get(schemaPath);
-            Object currentValue = col.getStatistics().genericGetMax();
-            if (value != null) {
-              if (value != currentValue) {
-                columnTypeMap.remove(schemaPath);
-              }
-            } else {
-              map.put(schemaPath, currentValue);
-            }
-          } else {
-            columnTypeMap.remove(schemaPath);
-          }
-        }
-
-        String filePath = footer.getFile().toUri().getPath();
-        rowGroupInfos.add(new ParquetGroupScan.RowGroupInfo(filePath, start, length, index));
-        logger.debug("rowGroupInfo path: {} start: {} length {}", filePath, start, length);
-        index++;
-
-        rowCount += rowGroup.getRowCount();
-        first = false;
-      }
-
-    }
-    Preconditions.checkState(!rowGroupInfos.isEmpty(), "No row groups found");
-    tContext.stop();
-    watch.stop();
-    logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
-  }
+  private Set<String> fileSet;
 
   @JsonIgnore
   private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap();
@@ -378,30 +234,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
       * remaining footers, we will not find any new partition columns, but we may discover that what was previously a
       * potential partition column now no longer qualifies, so it needs to be removed from the list.
-      * @param column
-      * @param columnChunkMetaData
-      * @param se
-      * @param first
       * @return whether column is a potential partition column
       */
-  private boolean checkForPartitionColumn(SchemaPath column, ColumnChunkMetaData columnChunkMetaData, SchemaElement se, boolean first) {
+  private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) {
+    SchemaPath schemaPath = columnMetadata.name;
     if (first) {
-      if (hasSingleValue(columnChunkMetaData)) {
-        columnTypeMap.put(column, getType(columnChunkMetaData, se));
+      if (hasSingleValue(columnMetadata)) {
+        columnTypeMap.put(schemaPath, getType(columnMetadata.primitiveType, columnMetadata.originalType));
         return true;
       } else {
         return false;
       }
     } else {
-      if (!columnTypeMap.keySet().contains(column)) {
+      if (!columnTypeMap.keySet().contains(schemaPath)) {
         return false;
       } else {
-        if (!hasSingleValue(columnChunkMetaData)) {
-          columnTypeMap.remove(column);
+        if (!hasSingleValue(columnMetadata)) {
+          columnTypeMap.remove(schemaPath);
           return false;
         }
-        if (!getType(columnChunkMetaData, se).equals(columnTypeMap.get(column))) {
-          columnTypeMap.remove(column);
+        if (!getType(columnMetadata.primitiveType, columnMetadata.originalType).equals(columnTypeMap.get(schemaPath))) {
+          columnTypeMap.remove(schemaPath);
           return false;
         }
       }
@@ -409,9 +262,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return true;
   }
 
-  private MajorType getType(ColumnChunkMetaData columnChunkMetaData, SchemaElement schemaElement) {
-    ConvertedType originalType = schemaElement == null ? null : schemaElement.getConverted_type();
-
+  private MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
     if (originalType != null) {
       switch (originalType) {
       case DECIMAL:
@@ -439,7 +290,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       }
     }
 
-    PrimitiveTypeName type = columnChunkMetaData.getType();
     switch (type) {
     case BOOLEAN:
       return Types.optional(MinorType.BIT);
@@ -460,25 +310,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
-  private boolean hasSingleValue(ColumnChunkMetaData columnChunkMetaData) {
-    Statistics stats = columnChunkMetaData.getStatistics();
-    boolean hasStats = stats != null && !stats.isEmpty();
-    if (hasStats) {
-      if (stats.genericGetMin() == null || stats.genericGetMax() == null) {
-        return false;
-      }
-      return stats.genericGetMax().equals(stats.genericGetMin());
-    } else {
-      return false;
-    }
+  private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) {
+    Object max = columnChunkMetaData.max;
+    Object min = columnChunkMetaData.min;
+    return max != null && max.equals(min);
   }
 
   @Override
   public void modifyFileSelection(FileSelection selection) {
     entries.clear();
+    fileSet = Sets.newHashSet();
     for (String fileName : selection.getAsFiles()) {
       entries.add(new ReadEntryWithPath(fileName));
+      fileSet.add(fileName);
+    }
+
+    List<RowGroupInfo> newRowGroupList = Lists.newArrayList();
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      if (fileSet.contains(rowGroupInfo.getPath())) {
+        newRowGroupList.add(rowGroupInfo);
+      }
     }
+    this.rowGroupInfos = newRowGroupList;
   }
 
   public MajorType getTypeForColumn(SchemaPath schemaPath) {
@@ -629,6 +482,125 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
+  private void init() throws IOException {
+    ParquetTableMetadata_v1 parquetTableMetadata;
+    List<FileStatus> fileStatuses = null;
+    if (entries.size() == 1) {
+      Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
+      Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
+      if (fs.exists(metaPath)) {
+        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+      } else {
+        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
+      }
+    } else {
+      Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
+      Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
+      if (fs.exists(metaPath)) {
+        parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
+      } else {
+        fileStatuses = Lists.newArrayList();
+        for (ReadEntryWithPath entry : entries) {
+          getFiles(entry.getPath(), fileStatuses);
+        }
+        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses);
+      }
+    }
+
+    if (fileSet == null) {
+      fileSet = Sets.newHashSet();
+      for (ParquetFileMetadata file : parquetTableMetadata.files) {
+        fileSet.add(file.path);
+      }
+    }
+
+    Map<String,DrillbitEndpoint> hostEndpointMap = Maps.newHashMap();
+
+    for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) {
+      hostEndpointMap.put(endpoint.getAddress(), endpoint);
+    }
+
+    rowGroupInfos = Lists.newArrayList();
+    for (ParquetFileMetadata file : parquetTableMetadata.files) {
+      int rgIndex = 0;
+      for (RowGroupMetadata rg : file.rowGroups) {
+        RowGroupInfo rowGroupInfo = new RowGroupInfo(file.path, rg.start, rg.length, rgIndex);
+        EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
+        for (String host : rg.hostAffinity.keySet()) {
+          if (hostEndpointMap.containsKey(host)) {
+            endpointByteMap.add(hostEndpointMap.get(host), (long) (rg.hostAffinity.get(host) * rg.length));
+          }
+        }
+        rowGroupInfo.setEndpointByteMap(endpointByteMap);
+        rgIndex++;
+        rowGroupInfos.add(rowGroupInfo);
+      }
+    }
+
+    this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+
+    columnValueCounts = Maps.newHashMap();
+    this.rowCount = 0;
+    boolean first = true;
+    for (ParquetFileMetadata file : parquetTableMetadata.files) {
+      for (RowGroupMetadata rowGroup : file.rowGroups) {
+        long rowCount = rowGroup.rowCount;
+        for (ColumnMetadata column : rowGroup.columns) {
+          SchemaPath schemaPath = column.name;
+          Long previousCount = columnValueCounts.get(schemaPath);
+          if (previousCount != null) {
+            if (previousCount != GroupScan.NO_COLUMN_STATS) {
+              if (column.nulls != null) {
+                Long newCount = rowCount - column.nulls;
+                columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
+              } else {
+
+              }
+            }
+          } else {
+            if (column.nulls != null) {
+              Long newCount = rowCount - column.nulls;
+              columnValueCounts.put(schemaPath, newCount);
+            } else {
+              columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+            }
+          }
+          boolean partitionColumn = checkForPartitionColumn(column, first);
+          if (partitionColumn) {
+            Map<SchemaPath,Object> map = partitionValueMap.get(file.path);
+            if (map == null) {
+              map = Maps.newHashMap();
+              partitionValueMap.put(file.path, map);
+            }
+            Object value = map.get(schemaPath);
+            Object currentValue = column.max;
+            if (value != null) {
+              if (value != currentValue) {
+                columnTypeMap.remove(schemaPath);
+              }
+            } else {
+              map.put(schemaPath, currentValue);
+            }
+          } else {
+            columnTypeMap.remove(schemaPath);
+          }
+        }
+        this.rowCount += rowGroup.rowCount;
+        first = false;
+      }
+    }
+  }
+
+  private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 parquetTableMetadata) {
+    List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList();
+    for (ParquetFileMetadata file : parquetTableMetadata.files) {
+      if (fileSet.contains(file.path)) {
+        newFileMetadataList.add(file);
+      }
+    }
+    return new ParquetTableMetadata_v1(newFileMetadataList, new ArrayList<String>());
+  }
+
   /**
    * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
    * rowGroup
@@ -637,23 +609,19 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    */
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
+    return this.endpointAffinities;
+  }
 
-    if (this.endpointAffinities == null) {
-      BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits());
-      try {
-        List<TimedRunnable<Void>> blockMappers = Lists.newArrayList();
-        for (RowGroupInfo rgi : rowGroupInfos) {
-          blockMappers.add(new BlockMapper(bmb, rgi));
-        }
-        TimedRunnable.run("Load Parquet RowGroup block maps", logger, blockMappers, 16);
-      } catch (IOException e) {
-        logger.warn("Failure while determining operator affinity.", e);
-        return Collections.emptyList();
+  private void getFiles(String path, List<FileStatus> fileStatuses) throws IOException {
+    Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
+    FileStatus fileStatus = fs.getFileStatus(p);
+    if (fileStatus.isDirectory()) {
+      for (FileStatus f : fs.listStatus(p, new DrillPathFilter())) {
+        getFiles(f.getPath().toString(), fileStatuses);
       }
-
-      this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+    } else {
+      fileStatuses.add(fileStatus);
     }
-    return this.endpointAffinities;
   }
 
   private class BlockMapper extends TimedRunnable<Void> {
@@ -704,9 +672,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
     List<RowGroupReadEntry> entries = Lists.newArrayList();
     for (RowGroupInfo rgi : rowGroups) {
-      RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(),
-          rgi.getRowGroupIndex());
-      entries.add(rgre);
+      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex());
+      entries.add(entry);
     }
     return entries;
   }
@@ -757,7 +724,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   public FileGroupScan clone(FileSelection selection) throws IOException {
     ParquetGroupScan newScan = new ParquetGroupScan(this);
     newScan.modifyFileSelection(selection);
-    newScan.readFooterFromEntries();
+    newScan.init();
     return newScan;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
index fea0875..22d4ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
@@ -45,7 +45,6 @@ public class AffinityCreator {
       for (ObjectLongCursor<DrillbitEndpoint> cursor : entry.getByteMap()) {
         long bytes = cursor.value;
         float affinity = (float)bytes / (float)totalBytes;
-        logger.debug("Work: {} Endpoint: {} Bytes: {}", work, cursor.key.getAddress(), bytes);
         affinities.putOrAdd(cursor.key, affinity, affinity);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0ee60958/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
new file mode 100644
index 0000000..ccaa9e7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Joiner;
+import org.apache.drill.BaseTestQuery;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+public class TestParquetMetadataCache extends BaseTestQuery {
+
+  @Test
+  public void testCache() throws Exception {
+    String tableName = "nation_ctas";
+    test("use dfs_test.tmp");
+    test(String.format("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName));
+    test(String.format("refresh table metadata %s", tableName));
+    checkForMetadataFile(tableName);
+    int rowCount = testSql(String.format("select * from %s", tableName));
+    Assert.assertEquals(25, rowCount);
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    String tableName = "nation_ctas_update";
+    test("use dfs_test.tmp");
+    test(String.format("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName));
+    test(String.format("refresh table metadata %s", tableName));
+    checkForMetadataFile(tableName);
+    Thread.sleep(1000);
+    test(String.format("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName));
+    int rowCount = testSql(String.format("select * from %s", tableName));
+    Assert.assertEquals(50, rowCount);
+  }
+
+  private void checkForMetadataFile(String table) throws Exception {
+    String tmpDir = getDfsTestTmpSchemaLocation();
+    String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);
+    Assert.assertTrue(Files.exists(new File(metaFile).toPath()));
+  }
+}


[2/2] drill git commit: reverting a fix no longer needed after DRILL-3767

Posted by ad...@apache.org.
reverting a fix no longer needed after DRILL-3767


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1716cb08
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1716cb08
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1716cb08

Branch: refs/heads/master
Commit: 1716cb083f6c873e45b2745ce449fbf605d9d16c
Parents: 0ee6095
Author: adeneche <ad...@gmail.com>
Authored: Fri Sep 11 14:51:52 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Fri Sep 18 16:19:27 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/drill/exec/store/parquet/Metadata.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1716cb08/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 02414a4..58f6d2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -240,7 +240,7 @@ public class Metadata {
     if (type.isPrimitive()) {
       return type.getOriginalType();
     }
-    Type t = ((GroupType) type).getType(path[path.length - depth - 1]);
+    Type t = ((GroupType) type).getType(path[depth]);
     return getOriginalType(t, path, depth + 1);
   }