You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2019/01/04 06:54:07 UTC

[GitHub] gparai closed pull request #1576: DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache i…

gparai closed pull request #1576: DRILL-6894: CTAS and CTTAS are not working on S3 storage when cache i…
URL: https://github.com/apache/drill/pull/1576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 721e80002d1..11dc2042146 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -33,13 +33,11 @@
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -72,20 +70,17 @@ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer)
     Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", " ");
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put("extension", "json");
     options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
     options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
     options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
     options.put("enableNanInf", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
-    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
+
+    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy(), getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 9e6aaf8d9dd..2e80b3ffb14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -64,8 +64,11 @@
   // Record write status
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
 
-  public JsonRecordWriter(StorageStrategy storageStrategy){
+  private Configuration fsConf;
+
+  public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) {
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -78,9 +81,7 @@ public void init(Map<String, String> writerOptions) throws IOException {
     this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
     final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify"));
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bc129ae1d1a..2ac24d8d07c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,6 @@
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
@@ -50,7 +49,6 @@
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -117,17 +115,14 @@ public RecordWriter getRecordWriter(final FragmentContext context, final EasyWri
     final Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", getConfig().getFieldDelimiterAsString());
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put("extension", getConfig().getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
+    RecordWriter recordWriter = new DrillTextRecordWriter(
+        context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 2c409963a0f..f46cc1cf299 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,7 +47,6 @@
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
@@ -140,8 +139,6 @@ public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter write
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
 
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) writer.getStorageConfig()).getConnection());
-
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
     options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
       context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45233c4da4e..5a64f4070d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -132,6 +132,7 @@ public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws
     this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
     this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.DEFAULT : writer.getStorageStrategy();
     this.cleanUpLocations = Lists.newArrayList();
+    this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
   }
 
   @Override
@@ -139,8 +140,6 @@ public void init(Map<String, String> writerOptions) throws IOException {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
 
-    conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
     fs = FileSystem.get(conf);
     blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
     pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 7b7c47fe4c9..83a00bd1784 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -56,9 +56,12 @@
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record separated by field delimiter
 
-  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
+  private Configuration fsConf;
+
+  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
     super(allocator);
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -68,9 +71,7 @@ public void init(Map<String, String> writerOptions) throws IOException {
     this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     this.currentRecord = new StringBuilder();
     this.index = 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services