You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/14 09:25:16 UTC

[GitHub] [flink] nuno-c-afonso commented on a diff in pull request #19937: [FLINK-28007][connectors/kinesis,firehose] Migrated Kinesis Firehose & Streams …

nuno-c-afonso commented on code in PR #19937:
URL: https://github.com/apache/flink/pull/19937#discussion_r896588637


##########
flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java:
##########
@@ -97,77 +94,51 @@ public static Properties createConfig(String endpoint) {
         return config;
     }
 
-    public static SdkAsyncHttpClient createHttpClient(String endpoint) {
-        return AWSGeneralUtil.createAsyncHttpClient(
-                createConfig(endpoint),
-                NettyNioAsyncHttpClient.builder()
-                        .eventLoopGroupBuilder(SdkEventLoopGroup.builder()));
+    public static SdkHttpClient createHttpClient() {
+        AttributeMap.Builder attributeMapBuilder = AttributeMap.builder();
+        attributeMapBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true);
+        attributeMapBuilder.put(SdkHttpConfigurationOption.PROTOCOL, Protocol.HTTP1_1);
+        return ApacheHttpClient.builder().buildWithDefaults(attributeMapBuilder.build());
     }
 
-    public static void createBucket(S3AsyncClient s3Client, String bucketName)
-            throws ExecutionException, InterruptedException {
+    public static void createBucket(S3Client s3Client, String bucketName) {
         CreateBucketRequest bucketRequest =
                 CreateBucketRequest.builder().bucket(bucketName).build();
         s3Client.createBucket(bucketRequest);
 
         HeadBucketRequest bucketRequestWait =
                 HeadBucketRequest.builder().bucket(bucketName).build();
 
-        try (final S3AsyncWaiter waiter = s3Client.waiter()) {
-            waiter.waitUntilBucketExists(bucketRequestWait).get();
+        try (final S3Waiter waiter = s3Client.waiter()) {
+            waiter.waitUntilBucketExists(bucketRequestWait);
         }
     }
 
-    public static void createIAMRole(IamAsyncClient iam, String roleName)
-            throws ExecutionException, InterruptedException {
+    public static void createIAMRole(IamClient iam, String roleName) {
         CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
 
-        CompletableFuture<CreateRoleResponse> responseFuture = iam.createRole(request);
-        responseFuture.get();
+        iam.createRole(request);
     }
 
-    public static List<S3Object> listBucketObjects(S3AsyncClient s3, String bucketName)
-            throws ExecutionException, InterruptedException {
-        ListObjectsRequest listObjects = ListObjectsRequest.builder().bucket(bucketName).build();
-        CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
-        return res.get().contents();
+    public static List<S3Object> listBucketObjects(S3Client s3, String bucketName) {
+        ListObjectsV2Request listObjects =
+                ListObjectsV2Request.builder().bucket(bucketName).build();
+        ListObjectsV2Response res = s3.listObjectsV2(listObjects);
+        return res.contents();
     }
 
     public static <T> List<T> readObjectsFromS3Bucket(
-            S3AsyncClient s3AsyncClient,
+            S3Client s3Client,
             List<S3Object> objects,
             String bucketName,
             Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
-        S3BucketReader bucketReader = new S3BucketReader(s3AsyncClient, bucketName);
-        return bucketReader.readObjects(objects, deserializer);
-    }
-
-    /** Helper class to read objects from S3. */
-    private static class S3BucketReader {
-        private final S3AsyncClient s3AsyncClient;
-        private final String bucketName;
-
-        public S3BucketReader(S3AsyncClient s3AsyncClient, String bucketName) {
-            this.s3AsyncClient = s3AsyncClient;
-            this.bucketName = bucketName;
-        }
-
-        public <T> List<T> readObjects(
-                List<S3Object> objectList,
-                Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
-            return objectList.stream()
-                    .map(object -> readObjectWitKey(object.key(), deserializer))
-                    .collect(Collectors.toList());
-        }
-
-        public <T> T readObjectWitKey(
-                String key, Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
-            GetObjectRequest getObjectRequest =
-                    GetObjectRequest.builder().bucket(bucketName).key(key).build();
-            return s3AsyncClient
-                    .getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
-                    .thenApply(deserializer)
-                    .join();
+        List<T> results = new ArrayList<>();
+        for (S3Object object : objects) {
+            GetObjectRequest request =
+                    GetObjectRequest.builder().bucket(bucketName).key(object.key()).build();
+            ResponseBytes<GetObjectResponse> objectAsBytes = s3Client.getObjectAsBytes(request);
+            results.add(deserializer.apply(objectAsBytes));
         }
+        return results;

Review Comment:
   I think the previous use of `stream()` was a bit cleaner.
   
   We could do something like:
   ```
   GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder().bucket(bucketName);
   return objects.stream()
                 .map(object -> getObjectRequestBuilder.key(object.key()).build())
                 .map(s3Client::getObjectAsBytes)
                 .map(deserializer::apply)
                 .collect(Collectors.toList());
   ```
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org