You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "rajarshisarkar (via GitHub)" <gi...@apache.org> on 2023/04/27 05:31:20 UTC

[GitHub] [iceberg] rajarshisarkar opened a new pull request, #7444: AWS: Add SQS MetricsReporter

rajarshisarkar opened a new pull request, #7444:
URL: https://github.com/apache/iceberg/pull/7444

   This PR adds support for AWS SQS MetricsReporter.
   
   Spark SQL launch command:
   ```
   sh spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://<warehouse-path> \
       --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.metrics.SqsMetricsReporter \
       --conf spark.sql.catalog.my_catalog.sqs.queue-url=<sqs-queue-url>
   ```
   
   TODO: SnsMetricsReporter and CloudWatchMetricsReporter
   
   ----
   
   cc: @jackye1995 @amogh-jahagirdar @singhpk234 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1215564973


##########
aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   Iceberg is in the process of migrating existing tests to Junit5, so would be good to make this a Junit5 test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1215564898


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Class to store the properties used by {@link SqsMetricsReporter} */
+public class SqsMetricsReporterProperties {
+
+  public static final String SQS_QUEUE_URL = "sqs.queue-url";

Review Comment:
   are we expecting more than this property to have here? Seems a bit expensive to have a separate class just for this single property



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#issuecomment-1580689840

   Introduced `SqsMetricsReporterAwsClientFactory`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178659114


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   Should we throw in case it's any other kind of message? That means we can't handle it properly (it'll end up being null)



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   I think we should throw in case it's any other kind of message? That means we can't handle it properly (it'll end up being null)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1180124330


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();

Review Comment:
   I have created the `SqsClientFactory` which refers `SqsMetricsReporterProperties`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223246133


##########
aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SqsMetricsReporterTest {
+
+  public SerializableSupplier<SqsAsyncClient> sqs;
+
+  private SqsMetricsReporter sqsMetricsReporter;
+
+  private CommitReport commitReport;
+
+  @BeforeAll
+  public void before() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metrics-reporter.sqs.queue-url", "test-sqs-queue");
+    properties.put(
+        HttpClientProperties.CLIENT_TYPE, HttpClientProperties.HTTP_CLIENT_TYPE_NETTYNIO);
+    sqs = () -> mock(SqsAsyncClient.class);
+    sqsMetricsReporter = new SqsMetricsReporter(sqs, "test-queue-url");
+    commitReport =
+        ImmutableCommitReport.builder()
+            .tableName("tableName")
+            .snapshotId(123L)
+            .operation(DataOperations.APPEND)
+            .sequenceNumber(123L)
+            .commitMetrics(
+                CommitMetricsResult.from(
+                    CommitMetrics.of(new DefaultMetricsContext()), Maps.newHashMap()))
+            .build();
+  }
+
+  @Test
+  public void report() {

Review Comment:
   nit: use `testXXX` for testing methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223242605


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      CompletableFuture<SendMessageResponse> future =
+          client.sendMessage(
+              SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+      future.whenComplete(
+          (response, error) -> {
+            if (response != null) {
+              LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);
+            } else {
+              if (error != null) {
+                LOG.error("Failed to report metrics to SQS queue: {}", error.getMessage());
+              }
+            }
+          });
+    } catch (Exception e) {

Review Comment:
   can we at least use RuntimeException? Exception is too generic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#issuecomment-1568008929

   > Feels like we should use the async client for this? We should not be blocked by metrics reporting calls
   
   Added `NettyNioAsyncHttpClient`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1180417442


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(sqsMetricsReporterProperties);
+    sqsClient = sqsClientFactory.sqs();
+    sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+    Preconditions.checkArgument(

Review Comment:
   should this rather be checked in `SqsMetricsReporterProperties`?



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(sqsMetricsReporterProperties);
+    sqsClient = sqsClientFactory.sqs();
+    sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", SqsMetricsReporterProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);
+    } finally {
+      sqsClient.close();

Review Comment:
   do we want to close the sqs client after every metrics report? Typically we would want to re-use a client I would have assumed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178656711


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();

Review Comment:
   Also just wanted to confirm with @nastra what is the expectation of implementors of MetricsReporter, are all MetricsReporter#report calls expected to be best effort (so wherever metricsReporter is plugged in, it catches all exceptions generically), or do implementors need to handle that themselves? I imagine the first case since generally metrics reporting shouldn't block the critical path but just wanted to make sure



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();

Review Comment:
   Shouldn't the httpClient be the httpClient that's provided by AwsProperties? Not just UrlConnectionHttpClient? Lmk if I'm missing something



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   Can we put this in a `try/finally` to ensure that the sqsClient gets closed in case of failure when calling `sqs.sendMessage`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1180113586


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   Modified the logic to skip if the message is null. Also, introduced the `try-catch-finally` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223243401


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      CompletableFuture<SendMessageResponse> future =
+          client.sendMessage(
+              SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+      future.whenComplete(
+          (response, error) -> {
+            if (response != null) {
+              LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);
+            } else {
+              if (error != null) {
+                LOG.error("Failed to report metrics to SQS queue: {}", error.getMessage());
+              }
+            }
+          });
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);

Review Comment:
   I think this deserves a LOG.error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223240339


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {

Review Comment:
   this can be just `else` clause of the last clause? The parsers cannot return null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1221486973


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {

Review Comment:
   Added `SerializableSupplier<SqsAsyncClient> sqs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178660241


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   Or if it's an unknown MetricsReport type, we skip the sending of the message in the implementation itself and log warn that we are skipping it. Actually that seems better to me, since I think this should be best effort. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1218139335


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);

Review Comment:
   Yes, @nastra. I will retain the try-catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214854222


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;

Review Comment:
   +1 need to make it serializable like s3 in s3FileIO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1221475741


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+/** Factory class for SQS client */
+public class SqsClientFactory implements Serializable {

Review Comment:
   I have introduced `SqsMetricsReporterAwsClientFactory` interface.



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());

Review Comment:
   Added the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1222201600


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      CompletableFuture<SendMessageResponse> future =
+          client.sendMessage(
+              SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+      future.whenComplete(
+          (response, error) -> {
+            if (response != null) {
+              LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);
+            } else {
+              LOG.error("Failed to report metrics to SQS queue: {}", error.getMessage());

Review Comment:
   the error case should use condition `(error != null)`



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      CompletableFuture<SendMessageResponse> future =
+          client.sendMessage(
+              SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+      future.whenComplete(
+          (response, error) -> {
+            if (response != null) {
+              LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);
+            } else {
+              LOG.error("Failed to report metrics to SQS queue: {}", error.getMessage());
+            }
+          });
+      future.join();

Review Comment:
   I don't think we need to join the future, it will wait for completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1218137580


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Class to store the properties used by {@link SqsMetricsReporter} */
+public class SqsMetricsReporterProperties {
+
+  public static final String SQS_QUEUE_URL = "sqs.queue-url";

Review Comment:
   `metrics-reporter.sqs.client-factory-impl` should also come here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223237313


##########
aws/src/main/java/org/apache/iceberg/aws/SqsMetricsReporterAwsClientFactories.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.aws.metrics.DefaultSqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterProperties;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class SqsMetricsReporterAwsClientFactories {
+
+  private SqsMetricsReporterAwsClientFactories() {}
+
+  /**
+   * Attempts to load an AWS client factory class for SQS Metrics Reporter defined in the catalog
+   * property {@link SqsMetricsReporterProperties#METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL}. If the
+   * property wasn't set, fallback to {@link AwsClientFactories#from(Map) to intialize an AWS client
+   * factory class}
+   *
+   * @param properties catalog properties
+   * @return an instance of a factory class
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T initialize(Map<String, String> properties) {
+    String factoryImpl =
+        PropertyUtil.propertyAsString(
+            properties,
+            SqsMetricsReporterProperties.METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL,
+            DefaultSqsMetricsReporterAwsClientFactory.class.getName());

Review Comment:
   similar to the old client factory, we should be able to make `DefaultSqsMetricsReporterAwsClientFactory` a singleton, and directly return that if not set.



##########
aws/src/main/java/org/apache/iceberg/aws/SqsMetricsReporterAwsClientFactories.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.aws.metrics.DefaultSqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterProperties;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class SqsMetricsReporterAwsClientFactories {
+
+  private SqsMetricsReporterAwsClientFactories() {}
+
+  /**
+   * Attempts to load an AWS client factory class for SQS Metrics Reporter defined in the catalog
+   * property {@link SqsMetricsReporterProperties#METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL}. If the
+   * property wasn't set, fallback to {@link AwsClientFactories#from(Map) to intialize an AWS client
+   * factory class}
+   *
+   * @param properties catalog properties
+   * @return an instance of a factory class
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T initialize(Map<String, String> properties) {
+    String factoryImpl =
+        PropertyUtil.propertyAsString(
+            properties,
+            SqsMetricsReporterProperties.METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL,
+            DefaultSqsMetricsReporterAwsClientFactory.class.getName());

Review Comment:
   similar to the old client factory, we should be able to make `DefaultSqsMetricsReporterAwsClientFactory` a singleton, and directly return that if `METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL` is not set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223237941


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/DefaultSqsMetricsReporterAwsClientFactory.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public class DefaultSqsMetricsReporterAwsClientFactory

Review Comment:
   this can be an inner package protected class in the factories class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214856196


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);

Review Comment:
   I don't think we need to log it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178665979


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {

Review Comment:
   just a general FYI that we're aiming for making all `MetricsReporter` instances fully `Serializable` as part of https://github.com/apache/iceberg/pull/7370. So would be good to consider this here as well. 



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   +1 on wrapping this in a try-finally. See also my comment above



##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   typically the expectation of a `MetricsReporter` is that it doesn't throw when `report` is called as otherwise it would interfere with scans/commits.
   For example, the `RESTMetricsReporter` exits early if the `report` ever turns out to be null (which technically shouldn't happen) and executes all operations in a `try-catch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1221386025


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+/** Factory class for SQS client */
+public class SqsClientFactory implements Serializable {

Review Comment:
   +1 on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1215563901


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);

Review Comment:
   we should keep the try-catch, because we don't want to interfere with the reading/writing itself if anything goes wrong during reporting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214852033


##########
aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java:
##########
@@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
   public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
       "http-client.apache.use-idle-connection-reaper-enabled";
 
+  /**
+   * If this is set under {@link #CLIENT_TYPE}, {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
+   */
+  public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";

Review Comment:
   nit: `netty` seems sufficient instead of `nettynio`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1180117454


##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -760,6 +763,8 @@ public class AwsProperties implements Serializable {
   private String restSecretAccessKey;
   private String restSessionToken;
 
+  private String sqsQueueUrl;

Review Comment:
   Started to separate out the properties. I have kept a base class for common properties across all clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178668077


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   the general expectation is that there shouldn't be any exceptions bubbling up from the `report` method as otherwise we'll be interfering with scans/commits, meaning that every `MetricsReporter` impl needs to make sure to wrap everything in a `try-catch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214855109


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Class to store the properties used by {@link SqsMetricsReporter} */
+public class SqsMetricsReporterProperties {
+
+  public static final String SQS_QUEUE_URL = "sqs.queue-url";

Review Comment:
   what about `metrics-reporter.sqs.queue-url`? to be more specific



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1180112350


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   I have introduced the `try-catch-finally` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178660241


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   Or we skip the sending of the message in the implementation itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178659108


##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -760,6 +763,8 @@ public class AwsProperties implements Serializable {
   private String restSecretAccessKey;
   private String restSessionToken;
 
+  private String sqsQueueUrl;

Review Comment:
   Start from this, let's separate the properties from `AwsProperties` and put it in a different class like `SqsMetricsReporterProperties`. We will gradually also separate other properties to complete the refactoring so that we no longer depend on the client factory for all AWS clients



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#issuecomment-1525726860

   Feels like we should use the async client for this? We should not be blocked by metrics reporting calls


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223247151


##########
aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SqsMetricsReporterTest {
+
+  public SerializableSupplier<SqsAsyncClient> sqs;
+
+  private SqsMetricsReporter sqsMetricsReporter;
+
+  private CommitReport commitReport;
+
+  @BeforeAll
+  public void before() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metrics-reporter.sqs.queue-url", "test-sqs-queue");
+    properties.put(
+        HttpClientProperties.CLIENT_TYPE, HttpClientProperties.HTTP_CLIENT_TYPE_NETTYNIO);
+    sqs = () -> mock(SqsAsyncClient.class);
+    sqsMetricsReporter = new SqsMetricsReporter(sqs, "test-queue-url");
+    commitReport =
+        ImmutableCommitReport.builder()
+            .tableName("tableName")
+            .snapshotId(123L)
+            .operation(DataOperations.APPEND)
+            .sequenceNumber(123L)
+            .commitMetrics(
+                CommitMetricsResult.from(
+                    CommitMetrics.of(new DefaultMetricsContext()), Maps.newHashMap()))
+            .build();
+  }
+
+  @Test
+  public void report() {
+    sqsMetricsReporter.report(commitReport);

Review Comment:
   can we test both scan and commit reports, and more test cases for success and failure cases that you are catching in the implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178657313


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   Also just wanted to confirm with @nastra what is the expectation of implementors of MetricsReporter, are all MetricsReporter#report calls expected to be best effort (so wherever metricsReporter is plugged in, it catches all exceptions generically), or do implementors need to handle that themselves? I imagine the first case since generally metrics reporting shouldn't block the critical path but just wanted to make sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178659114


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }

Review Comment:
   I think we should throw in case it's any other kind of message? That means we can't handle it properly (currently, it'll end up being null, which is a bug)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1204098636


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(sqsMetricsReporterProperties);
+    sqsClient = sqsClientFactory.sqs();
+    sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+    Preconditions.checkArgument(

Review Comment:
   Yes, that should be right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214857201


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);

Review Comment:
   since it's non-blocking async client, this try catch is not useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214856600


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsAsyncClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SqsAsyncClient sqsClient, String sqsQueueUrl) {
+    this.sqsClient = sqsClient;
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(properties);
+    this.sqsClient = sqsClientFactory.sqs();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());

Review Comment:
   we should add a callback to log an error if send message fails



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214860560


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+/** Factory class for SQS client */
+public class SqsClientFactory implements Serializable {

Review Comment:
   Also we want the same pattern as S3 to have a `Factories` so users can use their own factory if necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214852033


##########
aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java:
##########
@@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
   public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
       "http-client.apache.use-idle-connection-reaper-enabled";
 
+  /**
+   * If this is set under {@link #CLIENT_TYPE}, {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
+   */
+  public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";

Review Comment:
   nit: `netty` seems sufficient instead of `nettynio`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1204099125


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    SqsClientFactory sqsClientFactory = new SqsClientFactory(sqsMetricsReporterProperties);
+    sqsClient = sqsClientFactory.sqs();
+    sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", SqsMetricsReporterProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      LOG.info("Received metrics report: {}", message);
+      sqsClient.sendMessage(
+          SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    } catch (Exception e) {
+      LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);
+    } finally {
+      sqsClient.close();

Review Comment:
   Removed the client close.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1214851337


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+/** Factory class for SQS client */
+public class SqsClientFactory implements Serializable {

Review Comment:
   I think this should be `SqsMetricsReporterAwsClientFactory`, similar to the `S3FileIOAwsClientFactory`, each functional entity gets its own client factory, not each AWS client has its own client factory. What do you think?
   
   https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/S3FileIOAwsClientFactories.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1221476553


##########
aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   Migrated to Junit5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178653922


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();
+    sqsQueueUrl = awsProperties.sqsQueueUrl();
+    Preconditions.checkArgument(
+        null != sqsQueueUrl, "%s should be be set", AwsProperties.SQS_QUEUE_URL);
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    String message = null;
+    if (report instanceof CommitReport) {
+      message = CommitReportParser.toJson((CommitReport) report);
+    } else if (report instanceof ScanReport) {
+      message = ScanReportParser.toJson((ScanReport) report);
+    }
+
+    LOG.info("Received metrics report: {}", message);
+    sqsClient.sendMessage(
+        SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+    sqsClient.close();

Review Comment:
   Can we put this in a `try/finally` to ensure that the sqsClient gets closed in case of failure when calling `sqs.sendMessage`? Also I think a log line after the sending of the message would be useful information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178660328


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SqsClient sqsClient;
+
+  private String sqsQueueUrl;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    AwsProperties awsProperties = new AwsProperties(properties);
+    sqsClient = SqsClient.builder().httpClient(UrlConnectionHttpClient.builder().build()).build();

Review Comment:
   I am thinking about doing a refactoring of the entire AWS module, where we have different properties classes for each important object (S3FileIO, GlueCatalog, etc.) and different client factories for each client. That fixes the issue that users today need to bring all related AWS clients just to use one object. We can start from this object and create an `SqsClientFactory` for related client configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1178675963


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {

Review Comment:
   In order to fully test that e.g. the `RESTMetricsReporter` is serializable, I've added https://github.com/apache/iceberg/pull/7370/files#diff-37f64b622d5c3e0e890d94773e84ef49ceb3c1e9b0c03da313db9266b0e75afbR1756-R1797. Maybe something similar could be added when testing whether `SqsMetricsReporter` is serializable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on pull request #7444: AWS: Add SQS MetricsReporter

Posted by "rajarshisarkar (via GitHub)" <gi...@apache.org>.
rajarshisarkar commented on PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#issuecomment-1527208577

   > Feels like we should use the async client for this? We should not be blocked by metrics reporting calls
   
   This would mean we introduce the AsyncHttp clients too: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration.html#http-clients-available. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223235515


##########
aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java:
##########
@@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
   public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
       "http-client.apache.use-idle-connection-reaper-enabled";
 
+  /**
+   * If this is set under {@link #CLIENT_TYPE}, {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
+   */
+  public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";
+
+  /**
+   * Used to configure connection maximum idle time {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS =
+      "http-client.nettynio.connection-max-idle-time-ms";
+  /**
+   * Used to configure connection acquisition timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_ACQUISITION_TIMEOUT_MS =
+      "http-client.nettynio.connection-acquisition-timeout-ms";
+  /**
+   * Used to configure connection timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_TIMEOUT_MS =
+      "http-client.nettynio.connection-timeout-ms";
+  /**
+   * Used to configure connection time to live {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_TIME_TO_LIVE_MS =
+      "http-client.nettynio.connection-time-to-live-ms";
+  /**
+   * Used to configure maximum concurrency {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_MAX_CONCURRENCY = "http-client.nettynio.max-concurrency";
+  /**
+   * Used to configure read timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_READ_TIMEOUT = "http-client.nettynio.read-timeout";
+  /**
+   * Used to configure maximum pending connection acquires {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_MAX_PENDING_CONNECTION_ACQUIRES =
+      "http-client.nettynio.max-pending-connection-acquires";
+  /**
+   * Used to configure write timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_WRITE_TIMEOUT = "http-client.nettynio.write-timeout";

Review Comment:
   nit: can you add time units to this, like `-ms`?



##########
aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java:
##########
@@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
   public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
       "http-client.apache.use-idle-connection-reaper-enabled";
 
+  /**
+   * If this is set under {@link #CLIENT_TYPE}, {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
+   */
+  public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";
+
+  /**
+   * Used to configure connection maximum idle time {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS =
+      "http-client.nettynio.connection-max-idle-time-ms";
+  /**
+   * Used to configure connection acquisition timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_ACQUISITION_TIMEOUT_MS =
+      "http-client.nettynio.connection-acquisition-timeout-ms";
+  /**
+   * Used to configure connection timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_TIMEOUT_MS =
+      "http-client.nettynio.connection-timeout-ms";
+  /**
+   * Used to configure connection time to live {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_CONNECTION_TIME_TO_LIVE_MS =
+      "http-client.nettynio.connection-time-to-live-ms";
+  /**
+   * Used to configure maximum concurrency {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_MAX_CONCURRENCY = "http-client.nettynio.max-concurrency";
+  /**
+   * Used to configure read timeout {@link
+   * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+   * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+   *
+   * <p>For more details, see
+   * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+   */
+  public static final String NETTYNIO_READ_TIMEOUT = "http-client.nettynio.read-timeout";

Review Comment:
   nit: can you add time units to this, like `-ms`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7444: AWS: Add SQS MetricsReporter

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7444:
URL: https://github.com/apache/iceberg/pull/7444#discussion_r1223245572


##########
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+  private SerializableSupplier<SqsAsyncClient> sqs;
+
+  private transient volatile SqsAsyncClient client;
+
+  private String sqsQueueUrl;
+
+  public SqsMetricsReporter() {}
+
+  public SqsMetricsReporter(SerializableSupplier<SqsAsyncClient> sqs, String sqsQueueUrl) {
+    this.sqs = sqs;
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsQueueUrl;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    SqsMetricsReporterProperties sqsMetricsReporterProperties =
+        new SqsMetricsReporterProperties(properties);
+    Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+    if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+      this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+    }
+    this.client = sqs.get();
+    this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+  }
+
+  @Override
+  public void report(MetricsReport report) {
+    if (null == report) {
+      LOG.warn("Received invalid metrics report: null");
+      return;
+    }
+
+    try {
+      String message = null;
+      if (report instanceof CommitReport) {
+        message = CommitReportParser.toJson((CommitReport) report);
+      } else if (report instanceof ScanReport) {
+        message = ScanReportParser.toJson((ScanReport) report);
+      }
+
+      if (null == message) {
+        LOG.warn("Received unknown MetricsReport type");
+        return;
+      }
+
+      CompletableFuture<SendMessageResponse> future =
+          client.sendMessage(
+              SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+      future.whenComplete(
+          (response, error) -> {
+            if (response != null) {
+              LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);

Review Comment:
   I don't think we need to log the response? Just logging error when it exists seems sufficient to me, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org