You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/03/19 18:29:44 UTC

[drill] 02/04: DRILL-7111: Fix table function execution for directories

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 8712ffd8be479b7f71bb65f605f41b5135e74216
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Mon Mar 18 18:19:39 2019 +0200

    DRILL-7111: Fix table function execution for directories
    
    closes #1700
---
 .../exec/store/dfs/WorkspaceSchemaFactory.java     | 69 +++++++++++++---------
 .../org/apache/drill/TestSelectWithOption.java     | 39 ++++++++++--
 2 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index b6b29e7..2e302f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -140,7 +140,7 @@ public class WorkspaceSchemaFactory {
         throw new ExecutionSetupException(message);
       }
       final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
-          ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
+          ImmutableList.of(Pattern.compile(".*")), ImmutableList.of());
       fileMatchers.add(fallbackMatcher);
       dropFileMatchers = fileMatchers.subList(0, fileMatchers.size() - 1);
     } else {
@@ -163,11 +163,10 @@ public class WorkspaceSchemaFactory {
    * Checks whether a FileSystem object has the permission to list/read workspace directory
    * @param fs a DrillFileSystem object that was created with certain user privilege
    * @return True if the user has access. False otherwise.
-   * @throws IOException
    */
   public boolean accessible(DrillFileSystem fs) throws IOException {
     try {
-      /**
+      /*
        * For Windows local file system, fs.access ends up using DeprecatedRawLocalFileStatus which has
        * TrustedInstaller as owner, and a member of Administrators group could not satisfy the permission.
        * In this case, we will still use method listStatus.
@@ -427,11 +426,10 @@ public class WorkspaceSchemaFactory {
     // Drill Process User file-system
     private DrillFileSystem dpsFs;
 
-    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) {
       super(parentSchemaPath, wsName);
       this.schemaConfig = schemaConfig;
       this.fs = fs;
-      //this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
       this.dpsFs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fsConf);
     }
 
@@ -722,10 +720,6 @@ public class WorkspaceSchemaFactory {
       return FileSystemConfig.NAME;
     }
 
-    private DrillTable isReadable(FormatMatcher m, FileSelection fileSelection) throws IOException {
-      return m.isReadable(getFS(), fileSelection, plugin, storageEngineName, schemaConfig);
-    }
-
     @Override
     public DrillTable create(TableInstance key) {
       try {
@@ -734,13 +728,20 @@ public class WorkspaceSchemaFactory {
           return null;
         }
 
-        final boolean hasDirectories = fileSelection.containsDirectories(getFS());
+        boolean hasDirectories = fileSelection.containsDirectories(getFS());
+
         if (key.sig.params.size() > 0) {
-          FormatPluginConfig fconfig = optionExtractor.createConfigForTable(key);
-          return new DynamicDrillTable(
-              plugin, storageEngineName, schemaConfig.getUserName(),
-              new FormatSelection(fconfig, fileSelection));
+          FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
+
+          if (newSelection.isEmptyDirectory()) {
+            return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
+          }
+
+          FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key);
+          FormatSelection selection = new FormatSelection(formatConfig, newSelection);
+          return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
         }
+
         if (hasDirectories) {
           for (final FormatMatcher matcher : dirMatchers) {
             try {
@@ -754,10 +755,8 @@ public class WorkspaceSchemaFactory {
           }
         }
 
-        final FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
-        if (newSelection == null) {
-          // empty directory / selection means that this is the empty and schemaless table
-          fileSelection.setEmptyDirectoryStatus();
+        FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
+        if (newSelection.isEmptyDirectory()) {
           return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
         }
 
@@ -783,8 +782,25 @@ public class WorkspaceSchemaFactory {
       return null;
     }
 
+    /**
+     * Expands given file selection if it has directories.
+     * If expanded file selection is null (i.e. directory is empty), sets empty directory status to true.
+     *
+     * @param fileSelection file selection
+     * @param hasDirectories flag that indicates if given file selection has directories
+     * @return revisited file selection
+     */
+    private FileSelection detectEmptySelection(FileSelection fileSelection, boolean hasDirectories) throws IOException {
+      FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
+      if (newSelection == null) {
+        // empty directory / selection means that this is the empty and schemaless table
+        fileSelection.setEmptyDirectoryStatus();
+        return fileSelection;
+      }
+      return newSelection;
+    }
+
     private FormatMatcher findMatcher(FileStatus file) {
-      FormatMatcher matcher = null;
       try {
         for (FormatMatcher m : dropFileMatchers) {
           if (m.isFileReadable(getFS(), file)) {
@@ -794,7 +810,7 @@ public class WorkspaceSchemaFactory {
       } catch (IOException e) {
         logger.debug("Failed to find format matcher for file: %s", file, e);
       }
-      return matcher;
+      return null;
     }
 
     @Override
@@ -820,8 +836,7 @@ public class WorkspaceSchemaFactory {
       }
 
       FormatMatcher matcher = null;
-      Queue<FileStatus> listOfFiles = new LinkedList<>();
-      listOfFiles.addAll(fileSelection.getStatuses(getFS()));
+      Queue<FileStatus> listOfFiles = new LinkedList<>(fileSelection.getStatuses(getFS()));
 
       while (!listOfFiles.isEmpty()) {
         FileStatus currentFile = listOfFiles.poll();
@@ -865,13 +880,13 @@ public class WorkspaceSchemaFactory {
         StringBuilder tableRenameBuilder = new StringBuilder();
         int lastSlashIndex = table.lastIndexOf(Path.SEPARATOR);
         if (lastSlashIndex != -1) {
-          tableRenameBuilder.append(table.substring(0, lastSlashIndex + 1));
+          tableRenameBuilder.append(table, 0, lastSlashIndex + 1);
         }
         // Generate unique identifier which will be added as a suffix to the table name
         ThreadLocalRandom r = ThreadLocalRandom.current();
         long time =  (System.currentTimeMillis()/1000);
-        Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
-        Long p2 = r.nextLong();
+        long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
+        long p2 = r.nextLong();
         final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
         String[] pathSplit = table.split(Path.SEPARATOR);
         /*
@@ -884,9 +899,9 @@ public class WorkspaceSchemaFactory {
             .append(DrillFileSystem.UNDERSCORE_PREFIX)
             .append(pathSplit[pathSplit.length - 1])
             .append(fileNameDelimiter)
-            .append(p1.toString())
+            .append(p1)
             .append(fileNameDelimiter)
-            .append(p2.toString());
+            .append(p2);
 
         String tableRename = tableRenameBuilder.toString();
         fs.rename(new Path(defaultLocation, table), new Path(defaultLocation, tableRename));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index a6dff74..ec23194 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -25,10 +25,11 @@ import static org.junit.Assert.assertThat;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.file.Paths;
 
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestBuilder;
 import org.junit.Test;
@@ -36,13 +37,12 @@ import org.junit.experimental.categories.Category;
 
 @Category(SqlTest.class)
 public class TestSelectWithOption extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
 
   private File genCSVFile(String name, String... rows) throws IOException {
     File file = new File(format("%s/%s.csv", dirTestWatcher.getRootDir(), name));
     try (FileWriter fw = new FileWriter(file)) {
-      for (int i = 0; i < rows.length; i++) {
-        fw.append(rows[i] + "\n");
+      for (String row : rows) {
+        fw.append(row).append("\n");
       }
     }
     return file;
@@ -291,4 +291,35 @@ public class TestSelectWithOption extends BaseTestQuery {
       throw e;
     }
   }
+
+  @Test
+  public void testTableFunctionWithDirectoryExpansion() throws Exception {
+    String tableName = "dirTable";
+    String query = "select 'A' as col from (values(1))";
+    test("use dfs.tmp");
+    try {
+      alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+      test("create table %s as %s", tableName, query);
+
+      testBuilder()
+        .sqlQuery("select * from table(%s(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
+        .unOrdered()
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
+      test("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testTableFunctionWithEmptyDirectory() throws Exception {
+    String tableName = "emptyTable";
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(tableName));
+    testBuilder()
+      .sqlQuery("select * from table(dfs.tmp.`%s`(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
+      .expectsEmptyResultSet()
+      .go();
+  }
+
 }