You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by gp...@apache.org on 2019/01/04 06:54:02 UTC

[drill] 07/10: DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache is disabled

This is an automated email from the ASF dual-hosted git repository.

gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a9331361c72d47c98ae16087e865bdf61eb01d96
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Fri Dec 14 19:42:51 2018 +0200

    DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache is disabled
    
    - provided JsonRecordWriter, ParquetRecordWriter and DrillTextRecordWriter with file system configuration
    closes #1576
---
 .../org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java  | 9 ++-------
 .../org/apache/drill/exec/store/easy/json/JsonRecordWriter.java  | 9 +++++----
 .../org/apache/drill/exec/store/easy/text/TextFormatPlugin.java  | 9 ++-------
 .../org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java | 3 ---
 .../org/apache/drill/exec/store/parquet/ParquetRecordWriter.java | 3 +--
 .../org/apache/drill/exec/store/text/DrillTextRecordWriter.java  | 9 +++++----
 6 files changed, 15 insertions(+), 27 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 721e800..11dc204 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -33,13 +33,11 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -72,20 +70,17 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", " ");
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put("extension", "json");
     options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
     options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
     options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
     options.put("enableNanInf", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
-    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
+
+    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy(), getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 9e6aaf8..2e80b3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -64,8 +64,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   // Record write status
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
 
-  public JsonRecordWriter(StorageStrategy storageStrategy){
+  private Configuration fsConf;
+
+  public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) {
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -78,9 +81,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
     this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
     final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify"));
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bc129ae..2ac24d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
@@ -50,7 +49,6 @@ import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -117,17 +115,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     final Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", getConfig().getFieldDelimiterAsString());
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put("extension", getConfig().getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
+    RecordWriter recordWriter = new DrillTextRecordWriter(
+        context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 2c40996..f46cc1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,7 +47,6 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
@@ -140,8 +139,6 @@ public class ParquetFormatPlugin implements FormatPlugin {
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
 
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
     options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
       context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45233c4..5a64f40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -132,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
     this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.DEFAULT : writer.getStorageStrategy();
     this.cleanUpLocations = Lists.newArrayList();
+    this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
   }
 
   @Override
@@ -139,8 +140,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
 
-    conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
     fs = FileSystem.get(conf);
     blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
     pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 7b7c47f..83a00bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -56,9 +56,12 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record separated by field delimiter
 
-  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
+  private Configuration fsConf;
+
+  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
     super(allocator);
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -68,9 +71,7 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
     this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     this.currentRecord = new StringBuilder();
     this.index = 0;