You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2021/09/10 00:24:41 UTC

Change in asterixdb[master]: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache

From Wael Alkowaileet <wa...@gmail.com>:

Wael Alkowaileet has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145 )


Change subject: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache
......................................................................

[NO ISSUE][EXT] Disable Hadoop FileSystem Cache

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Hadoop FileSystem can be cached along with any credentials
(e.g., S3 tokens). We want to avoid caching the credentials
to avoid any risks that could be caused by caching them.

Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 52 insertions(+), 34 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/45/13145/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 243b87c..f7d9de2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -63,6 +63,8 @@
     public static final String KEY_HADOOP_ASTERIX_WARNINGS_ENABLED = "org.apache.asterix.warnings.enabled";
     //Base64 encoded warnings issued from Hadoop
     public static final String KEY_HADOOP_ASTERIX_WARNINGS_LIST = "org.apache.asterix.warnings.list";
+    //Disable caching FileSystem for Hadoop
+    public static final String KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE = "fs.%s.impl.disable.cache";
     //Base64 encoded function call information
     public static final String KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION = "org.apache.asterix.function.info";
     public static final String KEY_SOURCE_DATATYPE = "type-name";
@@ -84,7 +86,6 @@
     public static final String KEY_ESCAPE = "escape";
     public static final String KEY_PARSER = "parser";
     public static final String KEY_DATASET_RECORD = "dataset-record";
-    public static final String KEY_HIVE_SERDE = "hive-serde";
     public static final String KEY_RSS_URL = "url";
     public static final String KEY_INTERVAL = "interval";
     public static final String KEY_IS_FEED = "is-feed";
@@ -178,7 +179,6 @@
     /**
      * supported builtin record formats
      */
-    public static final String FORMAT_HIVE = "hive";
     public static final String FORMAT_BINARY = "binary";
     public static final String FORMAT_ADM = "adm";
     public static final String FORMAT_JSON_LOWER_CASE = "json";
@@ -198,7 +198,6 @@
 
     static {
         Set<String> formats = new HashSet<>(14);
-        formats.add(FORMAT_HIVE);
         formats.add(FORMAT_BINARY);
         formats.add(FORMAT_ADM);
         formats.add(FORMAT_JSON_LOWER_CASE);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index dd44436..ae3aa5f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -31,6 +31,7 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
@@ -76,6 +77,18 @@
 import java.util.regex.PatternSyntaxException;
 import java.util.stream.Stream;
 
+import com.azure.identity.ClientCertificateCredentialBuilder;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.google.api.gax.paging.Page;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -112,20 +125,6 @@
 import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.util.StorageUtil;
-
-import com.azure.identity.ClientCertificateCredentialBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.ListBlobsOptions;
-import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.ServiceAccountCredentials;
-import com.google.cloud.storage.Blob;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -231,7 +230,8 @@
     }
 
     public static IInputStreamFactory createExternalInputStreamFactory(ILibraryManager libraryManager,
-            DataverseName dataverse, String stream) throws HyracksDataException {
+                                                                       DataverseName dataverse, String stream)
+            throws HyracksDataException {
         try {
             String libraryName = getLibraryName(stream);
             String className = getExternalClassName(stream);
@@ -300,13 +300,15 @@
 
     // Currently not used.
     public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws AsterixException {
+                                                                            Map<String, String> configuration)
+            throws AsterixException {
         String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
         if (readerFactory == null) {
             throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter "
                     + ExternalDataConstants.KEY_READER_FACTORY + " must be specified.");
         }
-        String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+        String[] libraryAndFactory = readerFactory.split(
+                ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
         if (libraryAndFactory.length != 2) {
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
@@ -316,7 +318,8 @@
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
         }
-        DataverseName dataverseName = DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+        DataverseName dataverseName =
+                DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
         String libraryName = dataverseAndLibrary[1];
         ILibrary lib;
         try {
@@ -337,7 +340,8 @@
 
     // Currently not used.
     public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager,
-            DataverseName dataverse, String parserFactoryName) throws AsterixException {
+                                                                 DataverseName dataverse, String parserFactoryName)
+            throws AsterixException {
         try {
             String library = parserFactoryName.substring(0,
                     parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
@@ -573,7 +577,7 @@
      * @param configuration properties
      */
     public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+                                                         IWarningCollector collector) throws CompilationException {
         String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
 
         switch (type) {
@@ -921,12 +925,15 @@
          * @param numberOfPartitions number of partitions in the cluster
          */
         public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
-                int numberOfPartitions) {
+                                                     int numberOfPartitions) {
             String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
             String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
             String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
             String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
 
+            //Disable caching S3 FileSystem
+            HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
             /*
              * Authentication Methods:
              * 1- Anonymous: no accessKeyId and no secretAccessKey
@@ -973,7 +980,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1070,7 +1077,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         public static List<S3Object> listS3Objects(Map<String, String> configuration,
-                IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
+                                                   IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
                 throws CompilationException {
             // Prepare to retrieve the objects
             List<S3Object> filesOnly;
@@ -1119,7 +1126,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
+                                                    IncludeExcludeMatcher includeExcludeMatcher) {
             String newMarker = null;
             List<S3Object> filesOnly = new ArrayList<>();
 
@@ -1160,7 +1167,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
+                                                          IncludeExcludeMatcher includeExcludeMatcher) {
             String newMarker = null;
             List<S3Object> filesOnly = new ArrayList<>();
 
@@ -1198,7 +1205,7 @@
          * @param s3Objects List of returned objects
          */
         private static void collectAndFilterFiles(List<S3Object> s3Objects,
-                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
+                                                  BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
             for (S3Object object : s3Objects) {
                 // skip folders
                 if (object.key().endsWith("/")) {
@@ -1329,7 +1336,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1402,7 +1409,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e774b40..0f508cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -131,7 +131,7 @@
                         fileSplits
                                 .add(new FileSplit(filePath,
                                         block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                                ? block.getLength() : (file.getSize() - block.getOffset()),
+                                        ? block.getLength() : (file.getSize() - block.getOffset()),
                                         block.getHosts()));
                         orderedExternalFiles.add(file);
                     }
@@ -248,7 +248,7 @@
     }
 
     public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
-            AlgebricksAbsolutePartitionConstraint clusterLocations) {
+                                                                                AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
             return ((ICcApplicationContext) appCtx).getClusterStateManager().getClusterLocations();
         }
@@ -284,7 +284,7 @@
     }
 
     public static void setFunctionCallInformationMap(Map<String, FunctionCallInformation> funcCallInfoMap,
-            Configuration conf) throws IOException {
+                                                     Configuration conf) throws IOException {
         String stringFunctionCallInfoMap = ExternalDataUtils.serializeFunctionCallInfoToString(funcCallInfoMap);
         conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, stringFunctionCallInfoMap);
     }
@@ -336,4 +336,16 @@
             conf.unset(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST);
         }
     }
+
+    /**
+     * Hadoop can cache FileSystem instance if reading the same file. This method allows for disabling the cache
+     *
+     * @param conf     Hadoop configuration
+     * @param protocol fs scheme (or protocol). e.g., s3a
+     */
+    public static void disableHadoopFileSystemCache(Configuration conf, String protocol) {
+        //Disable fs cache
+        conf.set(String.format(ExternalDataConstants.KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE, protocol),
+                ExternalDataConstants.TRUE);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
Gerrit-Change-Number: 13145
Gerrit-PatchSet: 1
Gerrit-Owner: Wael Alkowaileet <wa...@gmail.com>
Gerrit-MessageType: newchange

Change in asterixdb[master]: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Wael Alkowaileet <wa...@gmail.com>:

Wael Alkowaileet has uploaded a new patch set (#2). ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145 )

Change subject: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache
......................................................................

[NO ISSUE][EXT] Disable Hadoop FileSystem Cache

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Hadoop FileSystem can be cached along with any credentials
(e.g., S3 tokens). We want to avoid caching the credentials
to avoid any risks that could be caused by caching them.

Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 18 insertions(+), 3 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/45/13145/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
Gerrit-Change-Number: 13145
Gerrit-PatchSet: 2
Gerrit-Owner: Wael Alkowaileet <wa...@gmail.com>
Gerrit-CC: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-MessageType: newpatchset

Change in asterixdb[master]: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Wael Alkowaileet <wa...@gmail.com>:

Wael Alkowaileet has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145 )


Change subject: [NO ISSUE][EXT] Disable Hadoop FileSystem Cache
......................................................................

[NO ISSUE][EXT] Disable Hadoop FileSystem Cache

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Hadoop FileSystem can be cached along with any credentials
(e.g., S3 tokens). We want to avoid caching the credentials
to avoid any risks that could be caused by caching them.

Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 52 insertions(+), 34 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/45/13145/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 243b87c..f7d9de2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -63,6 +63,8 @@
     public static final String KEY_HADOOP_ASTERIX_WARNINGS_ENABLED = "org.apache.asterix.warnings.enabled";
     //Base64 encoded warnings issued from Hadoop
     public static final String KEY_HADOOP_ASTERIX_WARNINGS_LIST = "org.apache.asterix.warnings.list";
+    //Disable caching FileSystem for Hadoop
+    public static final String KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE = "fs.%s.impl.disable.cache";
     //Base64 encoded function call information
     public static final String KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION = "org.apache.asterix.function.info";
     public static final String KEY_SOURCE_DATATYPE = "type-name";
@@ -84,7 +86,6 @@
     public static final String KEY_ESCAPE = "escape";
     public static final String KEY_PARSER = "parser";
     public static final String KEY_DATASET_RECORD = "dataset-record";
-    public static final String KEY_HIVE_SERDE = "hive-serde";
     public static final String KEY_RSS_URL = "url";
     public static final String KEY_INTERVAL = "interval";
     public static final String KEY_IS_FEED = "is-feed";
@@ -178,7 +179,6 @@
     /**
      * supported builtin record formats
      */
-    public static final String FORMAT_HIVE = "hive";
     public static final String FORMAT_BINARY = "binary";
     public static final String FORMAT_ADM = "adm";
     public static final String FORMAT_JSON_LOWER_CASE = "json";
@@ -198,7 +198,6 @@
 
     static {
         Set<String> formats = new HashSet<>(14);
-        formats.add(FORMAT_HIVE);
         formats.add(FORMAT_BINARY);
         formats.add(FORMAT_ADM);
         formats.add(FORMAT_JSON_LOWER_CASE);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index dd44436..ae3aa5f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -31,6 +31,7 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
@@ -76,6 +77,18 @@
 import java.util.regex.PatternSyntaxException;
 import java.util.stream.Stream;
 
+import com.azure.identity.ClientCertificateCredentialBuilder;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.google.api.gax.paging.Page;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -112,20 +125,6 @@
 import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.util.StorageUtil;
-
-import com.azure.identity.ClientCertificateCredentialBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.ListBlobsOptions;
-import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.ServiceAccountCredentials;
-import com.google.cloud.storage.Blob;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -231,7 +230,8 @@
     }
 
     public static IInputStreamFactory createExternalInputStreamFactory(ILibraryManager libraryManager,
-            DataverseName dataverse, String stream) throws HyracksDataException {
+                                                                       DataverseName dataverse, String stream)
+            throws HyracksDataException {
         try {
             String libraryName = getLibraryName(stream);
             String className = getExternalClassName(stream);
@@ -300,13 +300,15 @@
 
     // Currently not used.
     public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws AsterixException {
+                                                                            Map<String, String> configuration)
+            throws AsterixException {
         String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
         if (readerFactory == null) {
             throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter "
                     + ExternalDataConstants.KEY_READER_FACTORY + " must be specified.");
         }
-        String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+        String[] libraryAndFactory = readerFactory.split(
+                ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
         if (libraryAndFactory.length != 2) {
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
@@ -316,7 +318,8 @@
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
         }
-        DataverseName dataverseName = DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+        DataverseName dataverseName =
+                DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
         String libraryName = dataverseAndLibrary[1];
         ILibrary lib;
         try {
@@ -337,7 +340,8 @@
 
     // Currently not used.
     public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager,
-            DataverseName dataverse, String parserFactoryName) throws AsterixException {
+                                                                 DataverseName dataverse, String parserFactoryName)
+            throws AsterixException {
         try {
             String library = parserFactoryName.substring(0,
                     parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
@@ -573,7 +577,7 @@
      * @param configuration properties
      */
     public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+                                                         IWarningCollector collector) throws CompilationException {
         String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
 
         switch (type) {
@@ -921,12 +925,15 @@
          * @param numberOfPartitions number of partitions in the cluster
          */
         public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
-                int numberOfPartitions) {
+                                                     int numberOfPartitions) {
             String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
             String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
             String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
             String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
 
+            //Disable caching S3 FileSystem
+            HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
             /*
              * Authentication Methods:
              * 1- Anonymous: no accessKeyId and no secretAccessKey
@@ -973,7 +980,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1070,7 +1077,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         public static List<S3Object> listS3Objects(Map<String, String> configuration,
-                IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
+                                                   IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
                 throws CompilationException {
             // Prepare to retrieve the objects
             List<S3Object> filesOnly;
@@ -1119,7 +1126,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
+                                                    IncludeExcludeMatcher includeExcludeMatcher) {
             String newMarker = null;
             List<S3Object> filesOnly = new ArrayList<>();
 
@@ -1160,7 +1167,7 @@
          * @param includeExcludeMatcher include/exclude matchers to apply
          */
         private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
+                                                          IncludeExcludeMatcher includeExcludeMatcher) {
             String newMarker = null;
             List<S3Object> filesOnly = new ArrayList<>();
 
@@ -1198,7 +1205,7 @@
          * @param s3Objects List of returned objects
          */
         private static void collectAndFilterFiles(List<S3Object> s3Objects,
-                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
+                                                  BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
             for (S3Object object : s3Objects) {
                 // skip folders
                 if (object.key().endsWith("/")) {
@@ -1329,7 +1336,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1402,7 +1409,7 @@
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                                              IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e774b40..0f508cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -131,7 +131,7 @@
                         fileSplits
                                 .add(new FileSplit(filePath,
                                         block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                                ? block.getLength() : (file.getSize() - block.getOffset()),
+                                        ? block.getLength() : (file.getSize() - block.getOffset()),
                                         block.getHosts()));
                         orderedExternalFiles.add(file);
                     }
@@ -248,7 +248,7 @@
     }
 
     public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
-            AlgebricksAbsolutePartitionConstraint clusterLocations) {
+                                                                                AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
             return ((ICcApplicationContext) appCtx).getClusterStateManager().getClusterLocations();
         }
@@ -284,7 +284,7 @@
     }
 
     public static void setFunctionCallInformationMap(Map<String, FunctionCallInformation> funcCallInfoMap,
-            Configuration conf) throws IOException {
+                                                     Configuration conf) throws IOException {
         String stringFunctionCallInfoMap = ExternalDataUtils.serializeFunctionCallInfoToString(funcCallInfoMap);
         conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, stringFunctionCallInfoMap);
     }
@@ -336,4 +336,16 @@
             conf.unset(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST);
         }
     }
+
+    /**
+     * Hadoop can cache FileSystem instance if reading the same file. This method allows for disabling the cache
+     *
+     * @param conf     Hadoop configuration
+     * @param protocol fs scheme (or protocol). e.g., s3a
+     */
+    public static void disableHadoopFileSystemCache(Configuration conf, String protocol) {
+        //Disable fs cache
+        conf.set(String.format(ExternalDataConstants.KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE, protocol),
+                ExternalDataConstants.TRUE);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13145
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icc36ddf013eadff0fe1cca7c2e52fcd5f2bbbb5b
Gerrit-Change-Number: 13145
Gerrit-PatchSet: 1
Gerrit-Owner: Wael Alkowaileet <wa...@gmail.com>
Gerrit-MessageType: newchange