You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/04/07 12:47:49 UTC

[asterixdb] 08/25: [NO ISSUE][EXT] Avoid duplicate open for streams + minor refactoring

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 27792084826c46d5980c0aafa43affc580e2c576
Author: Hussain Towaileb <Hu...@Couchbase.com>
AuthorDate: Tue Mar 30 23:56:35 2021 +0300

    [NO ISSUE][EXT] Avoid duplicate open for streams + minor refactoring
    
    Change-Id: I405e84a30ee67b176c3389db6fd026c408ae1685
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10783
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../abstracts/AbstractExternalInputStreamFactory.java     | 13 +++++++++++--
 .../input/record/reader/aws/AwsS3InputStream.java         | 11 ++++++-----
 .../input/record/reader/aws/AwsS3InputStreamFactory.java  | 15 ++++-----------
 .../input/record/reader/azure/AzureBlobInputStream.java   | 12 ++++++------
 .../record/reader/azure/AzureBlobInputStreamFactory.java  | 15 ++++-----------
 .../asterix/external/util/ExternalDataConstants.java      |  4 +---
 .../apache/asterix/external/util/ExternalDataUtils.java   |  4 ++--
 7 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ca55b6f..0b215a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -30,6 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -69,9 +70,17 @@ public abstract class AbstractExternalInputStreamFactory implements IInputStream
         return partitionConstraint;
     }
 
+    protected int getPartitionsCount() {
+        return getPartitionConstraint().getLocations().length;
+    }
+
     @Override
-    public abstract void configure(IServiceContext ctx, Map<String, String> configuration,
-            IWarningCollector warningCollector) throws AlgebricksException;
+    public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+            throws AlgebricksException {
+        this.configuration = configuration;
+        this.partitionConstraint =
+                ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
+    }
 
     /**
      * Finds the smallest workload and returns it
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index e3e53d5..48035f3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -42,15 +42,17 @@ import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
     private final S3Client s3Client;
+    private final String bucket;
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
         super(configuration, filePaths);
         this.s3Client = buildAwsS3Client(configuration);
+        this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
 
     @Override
     protected boolean getInputStream() throws IOException {
-        String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+        String fileName = filePaths.get(nextFileIndex);
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
         GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
 
@@ -67,11 +69,10 @@ public class AwsS3InputStream extends AbstractExternalInputStream {
         }
 
         // Use gzip stream if needed
-        String filename = filePaths.get(nextFileIndex).toLowerCase();
-        if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
-            in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+        String lowerCaseFileName = fileName.toLowerCase();
+        if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+            in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         }
-
         return true;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 5bab888..a1c577a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,19 +18,17 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -61,8 +59,7 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory
     @Override
     public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
             throws AlgebricksException {
-        this.configuration = configuration;
-        ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+        super.configure(ctx, configuration, warningCollector);
 
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
@@ -70,7 +67,7 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory
 
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
-        String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
 
         try {
@@ -101,12 +98,8 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory
             warningCollector.warn(warning);
         }
 
-        // Partition constraints
-        partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
-        int partitionsCount = partitionConstraint.getLocations().length;
-
         // Distribute work load amongst the partitions
-        distributeWorkLoad(filesOnly, partitionsCount);
+        distributeWorkLoad(filesOnly, getPartitionsCount());
     }
 
     /**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
index 358c412..3fb3395 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.input.record.reader.azure;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -43,15 +41,17 @@ import com.azure.storage.blob.models.BlobStorageException;
 public class AzureBlobInputStream extends AbstractExternalInputStream {
 
     private final BlobServiceClient client;
+    private final String container;
 
     public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
         super(configuration, filePaths);
         this.client = buildAzureClient(configuration);
+        this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
 
     @Override
     protected boolean getInputStream() throws IOException {
-        String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+        String fileName = filePaths.get(nextFileIndex);
         BlobContainerClient blobContainerClient;
         BlobClient blob;
         try {
@@ -60,9 +60,9 @@ public class AzureBlobInputStream extends AbstractExternalInputStream {
             in = blob.openInputStream();
 
             // Use gzip stream if needed
-            String filename = filePaths.get(nextFileIndex).toLowerCase();
-            if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
-                in = new GZIPInputStream(in = blob.openInputStream(), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+            String lowerCaseFileName = fileName.toLowerCase();
+            if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+                in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
             }
         } catch (BlobStorageException ex) {
             if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index 167e22a..ca064b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -18,19 +18,17 @@
  */
 package org.apache.asterix.external.input.record.reader.azure;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.*;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -56,10 +54,9 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact
     @Override
     public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
             throws AlgebricksException {
-        this.configuration = configuration;
-        ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+        super.configure(ctx, configuration, warningCollector);
 
-        String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
         List<BlobItem> filesOnly = new ArrayList<>();
 
@@ -87,12 +84,8 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact
                 warningCollector.warn(warning);
             }
 
-            // Partition constraints
-            partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
-            int partitionsCount = partitionConstraint.getLocations().length;
-
             // Distribute work load amongst the partitions
-            distributeWorkLoad(filesOnly, partitionsCount);
+            distributeWorkLoad(filesOnly, getPartitionsCount());
         } catch (Exception ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fd5b269..45d15df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -288,6 +288,7 @@ public class ExternalDataConstants {
     public static final String INVALID_VAL = "invalid value";
 
     public static final String DEFINITION_FIELD_NAME = "definition";
+    public static final String CONTAINER_NAME_FIELD_NAME = "container";
 
     public static class AwsS3 {
         private AwsS3() {
@@ -298,7 +299,6 @@ public class ExternalDataConstants {
         public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
         public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
-        public static final String CONTAINER_NAME_FIELD_NAME = "container";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
     }
 
@@ -307,8 +307,6 @@ public class ExternalDataConstants {
             throw new AssertionError("do not instantiate");
         }
 
-        public static final String CONTAINER_NAME_FIELD_NAME = "container";
-        public static final String DEFINITION_FIELD_NAME = "definition";
         public static final String CONNECTION_STRING_FIELD_NAME = "connectionString";
         public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
         public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3ac1116..d1bbe89 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -776,7 +776,7 @@ public class ExternalDataUtils {
             S3Client s3Client = buildAwsS3Client(configuration);;
             S3Response response;
             boolean useOldApi = false;
-            String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
             String prefix = getPrefix(configuration);
 
             try {
@@ -943,7 +943,7 @@ public class ExternalDataUtils {
             // Check if the bucket is present
             BlobServiceClient blobServiceClient;
             try {
-                String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+                String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
                 blobServiceClient = buildAzureClient(configuration);
                 BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);