You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/17 00:00:32 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #5554: Update metrics group to be more consistent as other connector metrics.

stevenzwu opened a new pull request, #5554:
URL: https://github.com/apache/iceberg/pull/5554

   Basically, IcebergSourceReader will become part of metric name and table k-v pair will become a tag. This is suggested by Mason Chen in the comment below so that it is more consistent as other Flink connector metrics.
   
   https://github.com/apache/iceberg/pull/5410#discussion_r946260023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r949317174


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup

Review Comment:
   Don't know if it will be used by other tests or not. I typically like to refactor later when the needs come. But I see the `TestingMetricGroup` has been big enough to be a stand-alone class. Hence I moved it to its own file. it is package private for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948675886


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    private final Map<String, Counter> counters;
+
+    private TestingMetricGroup() {
+      this.counters = Maps.newHashMap();
+    }
+
+    /** Pass along the reference to share the map for child metric groups. */

Review Comment:
   This is the trick part because `IcebergSourceReaderMetrics` calls `addGroup` to create child metric group. We need to share the reference so that we can hijack the counter creation from the child group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948675886


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    private final Map<String, Counter> counters;
+
+    private TestingMetricGroup() {
+      this.counters = Maps.newHashMap();
+    }
+
+    /** Pass along the reference to share the map for child metric groups. */

Review Comment:
   This is the trick part because `IcebergSourceReaderMetrics` calls `addGroup` to create child metric group. We need to share the reference so that we can hijack the counter creation from the child group. I verified that the unit test failed before the fix and passed after the fix.



##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    private final Map<String, Counter> counters;
+
+    private TestingMetricGroup() {
+      this.counters = Maps.newHashMap();
+    }
+
+    /** Pass along the reference to share the map for child metric groups. */

Review Comment:
   This is the trick part because `IcebergSourceReaderMetrics` calls `addGroup` to create child metric group. We need to share the reference so that we can hijack the counter creation from the child group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948673398


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class IcebergSourceReaderMetrics {
+  private final Counter assignedSplits;
+  private final Counter assignedBytes;
+  private final Counter finishedSplits;
+  private final Counter finishedBytes;
+  private final Counter splitReaderFetchCalls;
+
+  public IcebergSourceReaderMetrics(MetricGroup metrics, String fullTableName) {
+    MetricGroup readerMetrics =
+        metrics.addGroup("IcebergSourceReader").addGroup("table", fullTableName);
+
+    this.assignedSplits = readerMetrics.counter("assignedSplits");
+    this.assignedBytes = readerMetrics.counter("assignedBytes");
+    this.finishedSplits = readerMetrics.counter("finishedSplits");
+    this.finishedBytes = readerMetrics.counter("finishedBytes");
+    this.splitReaderFetchCalls = readerMetrics.counter("splitReaderFetchCalls");

Review Comment:
   Good question. It is measuring as the counter of polling of record batch. mainly meant for liveness signal. Kind of like the Kafka consumer fetch request count.
   
   I am open to remove it if we think it is not very useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary merged pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary merged PR #5554:
URL: https://github.com/apache/iceberg/pull/5554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mas-chen commented on a diff in pull request #5554: Fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948302071


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class IcebergSourceReaderMetrics {
+  private final Counter assignedSplits;
+  private final Counter assignedBytes;
+  private final Counter finishedSplits;
+  private final Counter finishedBytes;
+  private final Counter splitReaderFetchCalls;
+
+  public IcebergSourceReaderMetrics(MetricGroup metrics, String fullTableName) {
+    MetricGroup readerMetrics =
+        metrics.addGroup("IcebergSourceReader").addGroup("table", fullTableName);
+
+    this.assignedSplits = readerMetrics.counter("assignedSplits");
+    this.assignedBytes = readerMetrics.counter("assignedBytes");
+    this.finishedSplits = readerMetrics.counter("finishedSplits");
+    this.finishedBytes = readerMetrics.counter("finishedBytes");
+    this.splitReaderFetchCalls = readerMetrics.counter("splitReaderFetchCalls");

Review Comment:
   How is this metric used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948699934


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    private final Map<String, Counter> counters;
+
+    private TestingMetricGroup() {
+      this.counters = Maps.newHashMap();
+    }
+
+    /** Pass along the reference to share the map for child metric groups. */
+    private TestingMetricGroup(Map<String, Counter> counters) {
+      this.counters = counters;
+    }
+
+    @Override
+    public Counter counter(String name) {
+      Counter counter = new SimpleCounter();
+      counters.put(name, counter);
+      return counter;
+    }
+
+    @Override
+    public MetricGroup addGroup(String name) {
+      return new TestingMetricGroup(counters);
+    }
+
+    @Override
+    public MetricGroup addGroup(String key, String value) {
+      return new TestingMetricGroup(counters);
+    }
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+      return new TestingOperatorIOMetricGroup();
+    }
+
+    @Override
+    public Counter getNumRecordsInErrorsCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {}

Review Comment:
   Not: I have not seen empty methods formatted this way in the Iceberg code yet. Could you please check how should they be formatted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1219088977

   LGTM, just 2 small questions.
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #5554: Fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1217583150

   @stevenzwu: Could we create a test for the bug mentioned above?
   
   Also, could we prefix the PR with `Flink:`.
   
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948697801


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup

Review Comment:
   Just a question: Is it possible that this class would be useful for other tests later? If so, maybe putting it to the `org.apache.flink.connector.testutils.source.reader` package would be better. OTOH if we think this will never be used anywhere else, then this is a good place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r949300722


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReader {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final GenericAppenderFactory appenderFactory;
+
+  public TestIcebergSourceReader() {
+    this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testReaderMetrics() throws Exception {
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    reader.start();
+
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
+    testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
+  }
+
+  private void testOneSplitFetcher(
+      IcebergSourceReader reader,
+      TestingReaderOutput<RowData> readerOutput,
+      TestingMetricGroup metricGroup,
+      int expectedCount)
+      throws Exception {
+    long seed = expectedCount;
+    // Each split should contain only one file with one record
+    List<List<Record>> recordBatchList =
+        ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, appenderFactory);
+    IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(task);
+    reader.addSplits(Arrays.asList(split));
+
+    while (readerOutput.getEmittedRecords().size() < expectedCount) {
+      reader.pollNext(readerOutput);
+    }
+
+    Assert.assertEquals(expectedCount, readerOutput.getEmittedRecords().size());
+    TestHelpers.assertRowData(
+        TestFixtures.SCHEMA,
+        recordBatchList.get(0).get(0),
+        readerOutput.getEmittedRecords().get(expectedCount - 1));
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("assignedSplits").getCount());
+
+    // One more poll will get null record batch.
+    // That will finish the split and cause split fetcher to be closed due to idleness.
+    // Then next split will create a new split reader.
+    reader.pollNext(readerOutput);
+    Assert.assertEquals(expectedCount, metricGroup.counters.get("finishedSplits").getCount());
+  }
+
+  private IcebergSourceReader createReader(
+      MetricGroup metricGroup, SourceReaderContext readerContext) {
+    IcebergSourceReaderMetrics readerMetrics =
+        new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
+    RowDataReaderFunction readerFunction =
+        new RowDataReaderFunction(
+            new Configuration(),
+            TestFixtures.SCHEMA,
+            TestFixtures.SCHEMA,
+            null,
+            true,
+            new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+            new PlaintextEncryptionManager());
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
+  }
+
+  private static class TestingMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    private final Map<String, Counter> counters;
+
+    private TestingMetricGroup() {
+      this.counters = Maps.newHashMap();
+    }
+
+    /** Pass along the reference to share the map for child metric groups. */
+    private TestingMetricGroup(Map<String, Counter> counters) {
+      this.counters = counters;
+    }
+
+    @Override
+    public Counter counter(String name) {
+      Counter counter = new SimpleCounter();
+      counters.put(name, counter);
+      return counter;
+    }
+
+    @Override
+    public MetricGroup addGroup(String name) {
+      return new TestingMetricGroup(counters);
+    }
+
+    @Override
+    public MetricGroup addGroup(String key, String value) {
+      return new TestingMetricGroup(counters);
+    }
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+      return new TestingOperatorIOMetricGroup();
+    }
+
+    @Override
+    public Counter getNumRecordsInErrorsCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {}

Review Comment:
   this is from latest spotless auto-formatting with google java format.
   
   here is an example from `FileIO`
   ```
    default void initialize(Map<String, String> properties) {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #5554: Update metrics group to be more consistent as other connector metrics.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1217290283

   @mas-chen @pvary can you please help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#discussion_r948673824


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java:
##########
@@ -66,29 +63,6 @@ public ReaderFunctionTestBase(FileFormat fileFormat) {
     this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
   }
 
-  private List<List<Record>> createRecordBatchList(int batchCount) {

Review Comment:
   These two util methods are  moved to `ReaderUtil`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1219063347

   @pvary It took some effort. But I managed to add a unit test with a custom metric group class so that counter values can be inspected. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #5554: Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style.

Posted by GitBox <gi...@apache.org>.
pvary commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1219890536

   Thanks @stevenzwu for the fix an @mas-chen for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #5554: Update metrics group to be more consistent as other connector metrics.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #5554:
URL: https://github.com/apache/iceberg/pull/5554#issuecomment-1217334123

   Just realized a bug in the source reader metrics. can't register it in split reader as split reader can be re-created as previous fetcher exited. As a result, we will see registration failure later and metrics value will never be updated again.
   
   ```
   2022-08-17 00:22:45,444 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'assignedSplits'. Metric will not be reported.[flink, operator, IcebergSourceReader, hive.db.tbl]
   ```
   
   Need to construct the metrics at reader (not split reader) level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org