You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2019/03/21 19:13:35 UTC

[drill] branch master updated: DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3ee4c  DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0
bd3ee4c is described below

commit bd3ee4cde5ced05362b538cd7aacbdab73ccef88
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Mar 20 13:29:40 2019 -0700

    DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0
---
 .../sql/handlers/RefreshMetadataHandler.java       | 33 ++++----
 .../exec/store/parquet/metadata/Metadata.java      | 88 +++++++++++++---------
 2 files changed, 72 insertions(+), 49 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 0a02f03..d7bb036 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -17,18 +17,14 @@
  */
 package org.apache.drill.exec.planner.sql.handlers;
 
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.drill.common.expression.SchemaPath;
-import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -37,15 +33,20 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.hadoop.fs.Path;
 
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
+
 public class RefreshMetadataHandler extends DefaultSqlHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class);
 
@@ -107,18 +108,20 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
         return notSupported(tableName);
       }
 
-      FormatSelection formatSelection = (FormatSelection) selection;
+      final FormatSelection formatSelection = (FormatSelection) selection;
 
       FormatPluginConfig formatConfig = formatSelection.getFormat();
       if (!((formatConfig instanceof ParquetFormatConfig) ||
-          ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+          ((formatConfig instanceof NamedFormatPluginConfig) &&
+            ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
         return notSupported(tableName);
       }
 
-      FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
-      DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+      // Always create filesystem object using process user, since it owns the metadata file
+      final DrillFileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
+        drillTable.getPlugin().getFormatPlugin(formatConfig).getFsConf());
 
-      Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+      final Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
       if (!fs.getFileStatus(selectionRoot).isDirectory()) {
         return notSupported(tableName);
       }
@@ -127,7 +130,7 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
         formatConfig = new ParquetFormatConfig();
       }
 
-      ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+      final ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
         .withFormatConfig((ParquetFormatConfig) formatConfig)
         .withOptions(context.getOptions())
         .build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index cd4aec2..c61c458 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,23 +24,21 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
-import org.apache.drill.exec.serialization.PathSerDe;
-import java.util.Set;
-import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.collections.Collectors;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.serialization.PathSerDe;
 import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -69,6 +67,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -85,7 +84,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa
 import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;
 
 /**
- * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}
+ * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}. All the creation of
+ * parquet metadata cache using create api's are forced to happen using the process user since only that user will have
+ * write permission for the cache file
  */
 public class Metadata {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
@@ -114,7 +115,7 @@ public class Metadata {
    */
   public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException {
     Metadata metadata = new Metadata(readerConfig);
-    metadata.createMetaFilesRecursively(path, fs, allColumns, columnSet);
+    metadata.createMetaFilesRecursivelyAsProcessUser(path, fs, allColumns, columnSet);
   }
 
   /**
@@ -204,6 +205,26 @@ public class Metadata {
   }
 
   /**
+   * Wrapper which makes sure that in all cases metadata file is created as a process user no matter what the caller
+   * is passing.
+   * @param path to the directory of the parquet table
+   * @param fs file system
+   * @param allColumns if set, store column metadata for all the columns
+   * @param columnSet Set of columns for which column metadata has to be stored
+   * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is
+   *         a metadata for all subdirectories (if they are present and there are no any parquet files in the
+   *         {@code path} directory).
+   * @throws IOException if parquet metadata can't be serialized and written to the json file
+   */
+  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
+  createMetaFilesRecursivelyAsProcessUser(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
+    throws IOException {
+    final FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
+      fs.getConf());
+    return createMetaFilesRecursively(path, processUserFileSystem, allColumns, columnSet);
+  }
+
+  /**
    * Create the parquet metadata files for the directory at the given path and for any subdirectories.
    * Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths.
    *
@@ -216,21 +237,23 @@ public class Metadata {
    *         {@code path} directory).
    * @throws IOException if parquet metadata can't be serialized and written to the json file
    */
-  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException {
+  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
+  createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
+    throws IOException {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
     List<Path> directoryList = Lists.newArrayList();
     ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet =
         new ConcurrentHashMap<>();
-    Path p = path;
-    FileStatus fileStatus = fs.getFileStatus(p);
+    FileStatus fileStatus = fs.getFileStatus(path);
     assert fileStatus.isDirectory() : "Expected directory";
 
     final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>();
 
-    for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
+    for (final FileStatus file : DrillFileSystemUtil.listAll(fs, path, false)) {
       if (file.isDirectory()) {
-        ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, columnSet)).getLeft();
+        ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns,
+          columnSet)).getLeft();
         metaDataList.addAll(subTableMetadata.files);
         directoryList.addAll(subTableMetadata.directories);
         directoryList.add(file.getPath());
@@ -259,17 +282,17 @@ public class Metadata {
     parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet);
 
     for (String oldName : OLD_METADATA_FILENAMES) {
-      fs.delete(new Path(p, oldName), false);
+      fs.delete(new Path(path, oldName), false);
     }
     //  relative paths in the metadata are only necessary for meta cache files.
     ParquetTableMetadata_v3 metadataTableWithRelativePaths =
         MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path);
-    writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), fs);
+    writeFile(metadataTableWithRelativePaths, new Path(path, METADATA_FILENAME), fs);
 
     if (directoryList.size() > 0 && childFiles.size() == 0) {
       ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths =
           new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories);
-      writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME), fs);
+      writeFile(parquetTableMetadataDirsRelativePaths, new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
       if (timer != null) {
         logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
       }
@@ -610,7 +633,7 @@ public class Metadata {
         parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
         if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
           parquetTableMetadataDirs =
-              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight();
+              (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight();
           newMetadata = true;
         }
       } else {
@@ -625,7 +648,7 @@ public class Metadata {
           if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
           // TODO change with current columns in existing metadata (auto refresh feature)
           parquetTableMetadata =
-              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft();
+              (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft();
           newMetadata = true;
         }
 
@@ -664,32 +687,29 @@ public class Metadata {
     FileStatus directoryStatus = fs.getFileStatus(parentDir);
     int numDirs = 1;
     if (directoryStatus.getModificationTime() > metaFileModifyTime) {
-      if (timer != null) {
-        logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
-            directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
-        timer.stop();
-      }
-      return true;
+      return logAndStopTimer(true, directoryStatus.getPath().toString(), timer, numDirs);
     }
+    boolean isModified = false;
     for (Path directory : directories) {
       numDirs++;
       metaContext.setStatus(directory);
       directoryStatus = fs.getFileStatus(directory);
       if (directoryStatus.getModificationTime() > metaFileModifyTime) {
-        if (timer != null) {
-          logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
-              directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
-          timer.stop();
-        }
-        return true;
+        isModified = true;
+        break;
       }
     }
+    return logAndStopTimer(isModified, directoryStatus.getPath().toString(), timer, numDirs);
+  }
+
+  private boolean logAndStopTimer(boolean isModified, String directoryName,
+                                  Stopwatch timer, int numDirectories) {
     if (timer != null) {
-      logger.debug("No directories were modified. Took {} ms to check modification time of {} directories",
-          timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+      logger.debug("{} directory was modified. Took {} ms to check modification time of {} directories",
+        isModified ? directoryName : "No", timer.elapsed(TimeUnit.MILLISECONDS), numDirectories);
       timer.stop();
     }
-    return false;
+    return isModified;
   }
 
 }