You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by pa...@apache.org on 2023/01/04 09:01:38 UTC

[inlong] branch master updated: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)

This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e305ae869 [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
e305ae869 is described below

commit e305ae8692832f592fde722c9143f0a13580ac8c
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Jan 4 17:01:31 2023 +0800

    [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
---
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 ++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 ++++++++++++++++++++--
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-------------
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 5 files changed, 110 insertions(+), 59 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index a5690a5b5..842584ba7 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
         subSinkMetricData.invokeDirty(rowCount, rowSize);
     }
 
+    /**
+     * output dirty metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param data the dirty data
+     */
+    public void outputDirtyMetricsWithEstimate(String database, String schema, String table, Object data) {
+        long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputDirtyMetrics(database, schema, table, 1, size);
+    }
+
     public void outputDirtyMetricsWithEstimate(Object data) {
         long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
         invokeDirty(1, size);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a5f070522..b0ed02abe 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -548,7 +548,7 @@ public class FlinkSink {
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
-                    catalogLoader, multipleSinkOption, dirtyOptions, dirtySink);
+                    catalogLoader, multipleSinkOption, dirtyOptions, dirtySink, inlongMetric, auditHostAndPorts);
             SingleOutputStreamOperator<RecordWithSchema> routeStream = input
                     .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
                             TypeInformation.of(RecordWithSchema.class),
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 37bb6f944..53fef8dd1 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.TableChange;
 import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +72,12 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
         implements
             OneInputStreamOperator<RowData, RecordWithSchema>,
@@ -90,13 +107,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
+    // metric
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private @Nullable transient SinkTableMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
             MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink, String inlongMetric, String auditHostAndPorts) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @SuppressWarnings("unchecked")
@@ -117,6 +143,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         this.recordQueues = new HashMap<>();
         this.schemaCache = new HashMap<>();
         this.blacklist = new HashSet<>();
+
+        // Initialize metric
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
     }
 
     @Override
@@ -136,14 +176,15 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
             LOGGER.error(String.format("Deserialize error, raw data: %s",
                     new String(element.getValue().getBinary(0))), e);
             handleDirtyData(new String(element.getValue().getBinary(0)),
-                    null, DirtyType.DESERIALIZE_ERROR, e);
+                    null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow"));
         }
         TableIdentifier tableId = null;
         try {
             tableId = parseId(jsonNode);
         } catch (Exception e) {
             LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e);
-            handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e);
+            handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
+                    e, TableIdentifier.of("unknow", "unknow"));
         }
         if (blacklist.contains(tableId)) {
             return;
@@ -156,7 +197,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         }
     }
 
-    private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e) {
+    private void handleDirtyData(Object dirtyData,
+            JsonNode rootNode,
+            DirtyType dirtyType,
+            Exception e,
+            TableIdentifier tableId) {
         if (!dirtyOptions.ignoreDirty()) {
             RuntimeException ex;
             if (e instanceof RuntimeException) {
@@ -182,6 +227,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                             .setIdentifier(dirtyOptions.getIdentifier());
                 }
                 dirtySink.invoke(builder.build());
+                if (metricData != null) {
+                    metricData.outputDirtyMetricsWithEstimate(
+                            tableId.namespace().toString(), null, tableId.name(), dirtyData);
+                }
             } catch (Exception ex) {
                 if (!dirtyOptions.ignoreSideOutputErrors()) {
                     throw new RuntimeException(ex);
@@ -198,6 +247,29 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                 processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
     }
 
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            String.format(INLONG_METRIC_STATE_NAME), TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
     private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
         // todo:parse ddl sql
     }
@@ -242,7 +314,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                                 LOG.warn("Ignore table {} schema change, old: {} new: {}.",
                                         tableId, dataSchema, latestSchema, e);
                                 blacklist.add(tableId);
-                                handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e);
+                                handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
                             }
                             return Collections.emptyList();
                         });
@@ -329,7 +401,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                     tableId,
                     pkListStr);
         } catch (Exception e) {
-            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e);
+            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId);
         }
         return null;
     }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 1abd5b97a..3022ec6c0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -38,13 +34,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
@@ -65,9 +57,7 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Iceberg writer that can distinguish different sink tables and route and distribute data into different
@@ -93,10 +83,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
     // metric
     private final String inlongMetric;
     private final String auditHostAndPorts;
-    @Nullable
-    private transient SinkMetricData metricData;
-    private transient ListState<MetricState> metricStateListState;
-    private transient MetricState metricState;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
@@ -123,18 +109,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         this.multipleWriters = new HashMap<>();
         this.multipleTables = new HashMap<>();
         this.multipleSchemas = new HashMap<>();
-
-        // Initialize metric
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
-                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
-                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-        }
     }
 
     @Override
@@ -202,9 +176,14 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
                     appendMode);
 
             if (multipleWriters.get(tableId) == null) {
+                StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric);
+                subWriterInlongMetric.append(DELIMITER)
+                        .append(Constants.DATABASE_NAME).append("=").append(tableId.namespace().toString())
+                        .append(DELIMITER)
+                        .append(Constants.TABLE_NAME).append("=").append(tableId.name());
                 IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
-                        tableId.toString(), taskWriterFactory, null,
-                        null, flinkRowType, dirtyOptions, dirtySink);
+                        tableId.toString(), taskWriterFactory, subWriterInlongMetric.toString(),
+                        auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink);
                 writer.setup(getRuntimeContext(),
                         new CallbackCollector<>(
                                 writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))),
@@ -223,9 +202,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         if (multipleWriters.get(tableId) != null) {
             for (RowData data : recordWithSchema.getData()) {
                 multipleWriters.get(tableId).processElement(data);
-                if (metricData != null) {
-                    metricData.invokeWithEstimate(data);
-                }
             }
         } else {
             LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
@@ -244,29 +220,11 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry : multipleWriters.entrySet()) {
             entry.getValue().snapshotState(context);
         }
-
-        // metric
-        if (metricData != null && metricStateListState != null) {
-            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
-                    getRuntimeContext().getIndexOfThisSubtask());
-        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         this.functionInitializationContext = context;
-
-        // init metric state
-        if (this.inlongMetric != null) {
-            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
-                    new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
-                            })));
-        }
-        if (context.isRestored()) {
-            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
-                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
-        }
     }
 
     private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index c8bfbeb08..dc30c5b21 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -105,6 +107,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -138,6 +142,9 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
                             .setRowType(flinkRowType)
                             .setDirtyMessage(e.getMessage());
                     dirtySink.invoke(builder.build());
+                    if (metricData != null) {
+                        metricData.invokeDirtyWithEstimate(value);
+                    }
                 } catch (Exception ex) {
                     if (!dirtyOptions.ignoreSideOutputErrors()) {
                         throw new RuntimeException(ex);
@@ -157,7 +164,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
         if (this.inlongMetric != null) {
             this.metricStateListState = context.getOperatorStateStore().getUnionListState(
                     new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            String.format("Iceberg(%s)-" + INLONG_METRIC_STATE_NAME, fullTableName),
+                            TypeInformation.of(new TypeHint<MetricState>() {
                             })));
         }
         if (context.isRestored()) {