You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/03/19 18:29:44 UTC
[drill] 02/04: DRILL-7111: Fix table function execution for
directories
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 8712ffd8be479b7f71bb65f605f41b5135e74216
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Mon Mar 18 18:19:39 2019 +0200
DRILL-7111: Fix table function execution for directories
closes #1700
---
.../exec/store/dfs/WorkspaceSchemaFactory.java | 69 +++++++++++++---------
.../org/apache/drill/TestSelectWithOption.java | 39 ++++++++++--
2 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index b6b29e7..2e302f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -140,7 +140,7 @@ public class WorkspaceSchemaFactory {
throw new ExecutionSetupException(message);
}
final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
- ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
+ ImmutableList.of(Pattern.compile(".*")), ImmutableList.of());
fileMatchers.add(fallbackMatcher);
dropFileMatchers = fileMatchers.subList(0, fileMatchers.size() - 1);
} else {
@@ -163,11 +163,10 @@ public class WorkspaceSchemaFactory {
* Checks whether a FileSystem object has the permission to list/read workspace directory
* @param fs a DrillFileSystem object that was created with certain user privilege
* @return True if the user has access. False otherwise.
- * @throws IOException
*/
public boolean accessible(DrillFileSystem fs) throws IOException {
try {
- /**
+ /*
* For Windows local file system, fs.access ends up using DeprecatedRawLocalFileStatus which has
* TrustedInstaller as owner, and a member of Administrators group could not satisfy the permission.
* In this case, we will still use method listStatus.
@@ -427,11 +426,10 @@ public class WorkspaceSchemaFactory {
// Drill Process User file-system
private DrillFileSystem dpsFs;
- public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
+ public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) {
super(parentSchemaPath, wsName);
this.schemaConfig = schemaConfig;
this.fs = fs;
- //this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
this.dpsFs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fsConf);
}
@@ -722,10 +720,6 @@ public class WorkspaceSchemaFactory {
return FileSystemConfig.NAME;
}
- private DrillTable isReadable(FormatMatcher m, FileSelection fileSelection) throws IOException {
- return m.isReadable(getFS(), fileSelection, plugin, storageEngineName, schemaConfig);
- }
-
@Override
public DrillTable create(TableInstance key) {
try {
@@ -734,13 +728,20 @@ public class WorkspaceSchemaFactory {
return null;
}
- final boolean hasDirectories = fileSelection.containsDirectories(getFS());
+ boolean hasDirectories = fileSelection.containsDirectories(getFS());
+
if (key.sig.params.size() > 0) {
- FormatPluginConfig fconfig = optionExtractor.createConfigForTable(key);
- return new DynamicDrillTable(
- plugin, storageEngineName, schemaConfig.getUserName(),
- new FormatSelection(fconfig, fileSelection));
+ FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
+
+ if (newSelection.isEmptyDirectory()) {
+ return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
+ }
+
+ FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key);
+ FormatSelection selection = new FormatSelection(formatConfig, newSelection);
+ return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
}
+
if (hasDirectories) {
for (final FormatMatcher matcher : dirMatchers) {
try {
@@ -754,10 +755,8 @@ public class WorkspaceSchemaFactory {
}
}
- final FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
- if (newSelection == null) {
- // empty directory / selection means that this is the empty and schemaless table
- fileSelection.setEmptyDirectoryStatus();
+ FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
+ if (newSelection.isEmptyDirectory()) {
return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
}
@@ -783,8 +782,25 @@ public class WorkspaceSchemaFactory {
return null;
}
+ /**
+ * Expands given file selection if it has directories.
+ * If expanded file selection is null (i.e. directory is empty), sets empty directory status to true.
+ *
+ * @param fileSelection file selection
+ * @param hasDirectories flag that indicates if given file selection has directories
+ * @return revisited file selection
+ */
+ private FileSelection detectEmptySelection(FileSelection fileSelection, boolean hasDirectories) throws IOException {
+ FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
+ if (newSelection == null) {
+ // empty directory / selection means that this is the empty and schemaless table
+ fileSelection.setEmptyDirectoryStatus();
+ return fileSelection;
+ }
+ return newSelection;
+ }
+
private FormatMatcher findMatcher(FileStatus file) {
- FormatMatcher matcher = null;
try {
for (FormatMatcher m : dropFileMatchers) {
if (m.isFileReadable(getFS(), file)) {
@@ -794,7 +810,7 @@ public class WorkspaceSchemaFactory {
} catch (IOException e) {
logger.debug("Failed to find format matcher for file: %s", file, e);
}
- return matcher;
+ return null;
}
@Override
@@ -820,8 +836,7 @@ public class WorkspaceSchemaFactory {
}
FormatMatcher matcher = null;
- Queue<FileStatus> listOfFiles = new LinkedList<>();
- listOfFiles.addAll(fileSelection.getStatuses(getFS()));
+ Queue<FileStatus> listOfFiles = new LinkedList<>(fileSelection.getStatuses(getFS()));
while (!listOfFiles.isEmpty()) {
FileStatus currentFile = listOfFiles.poll();
@@ -865,13 +880,13 @@ public class WorkspaceSchemaFactory {
StringBuilder tableRenameBuilder = new StringBuilder();
int lastSlashIndex = table.lastIndexOf(Path.SEPARATOR);
if (lastSlashIndex != -1) {
- tableRenameBuilder.append(table.substring(0, lastSlashIndex + 1));
+ tableRenameBuilder.append(table, 0, lastSlashIndex + 1);
}
// Generate unique identifier which will be added as a suffix to the table name
ThreadLocalRandom r = ThreadLocalRandom.current();
long time = (System.currentTimeMillis()/1000);
- Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
- Long p2 = r.nextLong();
+ long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
+ long p2 = r.nextLong();
final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
String[] pathSplit = table.split(Path.SEPARATOR);
/*
@@ -884,9 +899,9 @@ public class WorkspaceSchemaFactory {
.append(DrillFileSystem.UNDERSCORE_PREFIX)
.append(pathSplit[pathSplit.length - 1])
.append(fileNameDelimiter)
- .append(p1.toString())
+ .append(p1)
.append(fileNameDelimiter)
- .append(p2.toString());
+ .append(p2);
String tableRename = tableRenameBuilder.toString();
fs.rename(new Path(defaultLocation, table), new Path(defaultLocation, tableRename));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index a6dff74..ec23194 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -25,10 +25,11 @@ import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.nio.file.Paths;
import org.apache.drill.categories.SqlTest;
import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.TestBuilder;
import org.junit.Test;
@@ -36,13 +37,12 @@ import org.junit.experimental.categories.Category;
@Category(SqlTest.class)
public class TestSelectWithOption extends BaseTestQuery {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
private File genCSVFile(String name, String... rows) throws IOException {
File file = new File(format("%s/%s.csv", dirTestWatcher.getRootDir(), name));
try (FileWriter fw = new FileWriter(file)) {
- for (int i = 0; i < rows.length; i++) {
- fw.append(rows[i] + "\n");
+ for (String row : rows) {
+ fw.append(row).append("\n");
}
}
return file;
@@ -291,4 +291,35 @@ public class TestSelectWithOption extends BaseTestQuery {
throw e;
}
}
+
+ @Test
+ public void testTableFunctionWithDirectoryExpansion() throws Exception {
+ String tableName = "dirTable";
+ String query = "select 'A' as col from (values(1))";
+ test("use dfs.tmp");
+ try {
+ alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+ test("create table %s as %s", tableName, query);
+
+ testBuilder()
+ .sqlQuery("select * from table(%s(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
+ .unOrdered()
+ .sqlBaselineQuery(query)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
+ test("drop table if exists %s", tableName);
+ }
+ }
+
+ @Test
+ public void testTableFunctionWithEmptyDirectory() throws Exception {
+ String tableName = "emptyTable";
+ dirTestWatcher.makeTestTmpSubDir(Paths.get(tableName));
+ testBuilder()
+ .sqlQuery("select * from table(dfs.tmp.`%s`(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
+ .expectsEmptyResultSet()
+ .go();
+ }
+
}