You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2019/03/21 19:13:35 UTC
[drill] branch master updated: DRILL-7125: REFRESH TABLE METADATA
fails after upgrade from Drill 1.13.0 to Drill 1.15.0
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new bd3ee4c DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0
bd3ee4c is described below
commit bd3ee4cde5ced05362b538cd7aacbdab73ccef88
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Mar 20 13:29:40 2019 -0700
DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.13.0 to Drill 1.15.0
---
.../sql/handlers/RefreshMetadataHandler.java | 33 ++++----
.../exec/store/parquet/metadata/Metadata.java | 88 +++++++++++++---------
2 files changed, 72 insertions(+), 49 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 0a02f03..d7bb036 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -17,18 +17,14 @@
*/
package org.apache.drill.exec.planner.sql.handlers;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.drill.common.expression.SchemaPath;
-import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlNode;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.logical.DrillTable;
@@ -37,15 +33,20 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.hadoop.fs.Path;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
+
public class RefreshMetadataHandler extends DefaultSqlHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class);
@@ -107,18 +108,20 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
return notSupported(tableName);
}
- FormatSelection formatSelection = (FormatSelection) selection;
+ final FormatSelection formatSelection = (FormatSelection) selection;
FormatPluginConfig formatConfig = formatSelection.getFormat();
if (!((formatConfig instanceof ParquetFormatConfig) ||
- ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+ ((formatConfig instanceof NamedFormatPluginConfig) &&
+ ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
return notSupported(tableName);
}
- FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
- DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+ // Always create filesystem object using process user, since it owns the metadata file
+ final DrillFileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
+ drillTable.getPlugin().getFormatPlugin(formatConfig).getFsConf());
- Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+ final Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
if (!fs.getFileStatus(selectionRoot).isDirectory()) {
return notSupported(tableName);
}
@@ -127,7 +130,7 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
formatConfig = new ParquetFormatConfig();
}
- ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+ final ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
.withFormatConfig((ParquetFormatConfig) formatConfig)
.withOptions(context.getOptions())
.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index cd4aec2..c61c458 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,23 +24,21 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
-import org.apache.drill.exec.serialization.PathSerDe;
-import java.util.Set;
-import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -69,6 +67,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -85,7 +84,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;
/**
- * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}
+ * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}. All the creation of
+ * parquet metadata cache using create api's are forced to happen using the process user since only that user will have
+ * write permission for the cache file
*/
public class Metadata {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
@@ -114,7 +115,7 @@ public class Metadata {
*/
public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException {
Metadata metadata = new Metadata(readerConfig);
- metadata.createMetaFilesRecursively(path, fs, allColumns, columnSet);
+ metadata.createMetaFilesRecursivelyAsProcessUser(path, fs, allColumns, columnSet);
}
/**
@@ -204,6 +205,26 @@ public class Metadata {
}
/**
+ * Wrapper which makes sure that in all cases metadata file is created as a process user no matter what the caller
+ * is passing.
+ * @param path to the directory of the parquet table
+ * @param fs file system
+ * @param allColumns if set, store column metadata for all the columns
+ * @param columnSet Set of columns for which column metadata has to be stored
+ * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is
+ * a metadata for all subdirectories (if they are present and there are no any parquet files in the
+ * {@code path} directory).
+ * @throws IOException if parquet metadata can't be serialized and written to the json file
+ */
+ private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
+ createMetaFilesRecursivelyAsProcessUser(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
+ throws IOException {
+ final FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
+ fs.getConf());
+ return createMetaFilesRecursively(path, processUserFileSystem, allColumns, columnSet);
+ }
+
+ /**
* Create the parquet metadata files for the directory at the given path and for any subdirectories.
* Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths.
*
@@ -216,21 +237,23 @@ public class Metadata {
* {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file
*/
- private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException {
+ private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
+ createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
+ throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
List<Path> directoryList = Lists.newArrayList();
ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet =
new ConcurrentHashMap<>();
- Path p = path;
- FileStatus fileStatus = fs.getFileStatus(p);
+ FileStatus fileStatus = fs.getFileStatus(path);
assert fileStatus.isDirectory() : "Expected directory";
final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>();
- for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
+ for (final FileStatus file : DrillFileSystemUtil.listAll(fs, path, false)) {
if (file.isDirectory()) {
- ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, columnSet)).getLeft();
+ ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns,
+ columnSet)).getLeft();
metaDataList.addAll(subTableMetadata.files);
directoryList.addAll(subTableMetadata.directories);
directoryList.add(file.getPath());
@@ -259,17 +282,17 @@ public class Metadata {
parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet);
for (String oldName : OLD_METADATA_FILENAMES) {
- fs.delete(new Path(p, oldName), false);
+ fs.delete(new Path(path, oldName), false);
}
// relative paths in the metadata are only necessary for meta cache files.
ParquetTableMetadata_v3 metadataTableWithRelativePaths =
MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path);
- writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), fs);
+ writeFile(metadataTableWithRelativePaths, new Path(path, METADATA_FILENAME), fs);
if (directoryList.size() > 0 && childFiles.size() == 0) {
ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths =
new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories);
- writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME), fs);
+ writeFile(parquetTableMetadataDirsRelativePaths, new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
if (timer != null) {
logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
}
@@ -610,7 +633,7 @@ public class Metadata {
parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadataDirs =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight();
+ (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight();
newMetadata = true;
}
} else {
@@ -625,7 +648,7 @@ public class Metadata {
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
// TODO change with current columns in existing metadata (auto refresh feature)
parquetTableMetadata =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft();
+ (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft();
newMetadata = true;
}
@@ -664,32 +687,29 @@ public class Metadata {
FileStatus directoryStatus = fs.getFileStatus(parentDir);
int numDirs = 1;
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
- if (timer != null) {
- logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
- directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
- timer.stop();
- }
- return true;
+ return logAndStopTimer(true, directoryStatus.getPath().toString(), timer, numDirs);
}
+ boolean isModified = false;
for (Path directory : directories) {
numDirs++;
metaContext.setStatus(directory);
directoryStatus = fs.getFileStatus(directory);
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
- if (timer != null) {
- logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
- directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
- timer.stop();
- }
- return true;
+ isModified = true;
+ break;
}
}
+ return logAndStopTimer(isModified, directoryStatus.getPath().toString(), timer, numDirs);
+ }
+
+ private boolean logAndStopTimer(boolean isModified, String directoryName,
+ Stopwatch timer, int numDirectories) {
if (timer != null) {
- logger.debug("No directories were modified. Took {} ms to check modification time of {} directories",
- timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+ logger.debug("{} directory was modified. Took {} ms to check modification time of {} directories",
+ isModified ? directoryName : "No", timer.elapsed(TimeUnit.MILLISECONDS), numDirectories);
timer.stop();
}
- return false;
+ return isModified;
}
}