You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "tsreaper (via GitHub)" <gi...@apache.org> on 2023/11/14 04:58:32 UTC

Re: [PR] [Flink]add Flink Writer Metric [incubator-paimon]

tsreaper commented on code in PR #2193:
URL: https://github.com/apache/incubator-paimon/pull/2193#discussion_r1390853111


##########
paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java:
##########
@@ -133,6 +139,11 @@ public void write(InternalRow rowData) throws Exception {
                 throw new RuntimeException("Mem table is too small to hold a single element.");
             }
         }
+
+        if (writerMetrics != null) {
+            writerMetrics.incWriteRecordNum();
+            writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);

Review Comment:
   For most of the time, `System.currentTimeMillis() - start` will be `0` because records are only buffered in memory. This metrics is only meaningful when we write files to disks or perform compaction.
   
   Also, calling `System.currentTimeMillis()` for each write will introduce performance issue. Please be very careful with code which is called per record.



##########
paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java:
##########
@@ -204,13 +224,15 @@ private RowDataRollingFileWriter createRollingRowWriter() {
 
     private void trySyncLatestCompaction(boolean blocking)
             throws ExecutionException, InterruptedException {
+        long start = System.currentTimeMillis();
         compactManager
                 .getCompactionResult(blocking)
                 .ifPresent(
                         result -> {
                             compactBefore.addAll(result.before());
                             compactAfter.addAll(result.after());
                         });
+        writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);

Review Comment:
   If `blocking` is false this value is certainly `0` because we won't wait for compaction to end. Why do you record this metric?



##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java:
##########
@@ -125,4 +126,6 @@ List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
 
     /** With metrics to measure compaction. */
     FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
+
+    WriterMetrics getWriterMetrics();

Review Comment:
   Why do you add this method in the interface when it is only used internally by `AbstractFileStoreWrite` and its subclasses?



##########
paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java:
##########
@@ -151,6 +158,11 @@ public void write(KeyValue kv) throws Exception {
                 throw new RuntimeException("Mem table is too small to hold a single element.");
             }
         }
+
+        if (writerMetrics != null) {
+            writerMetrics.incWriteRecordNum();
+            writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);

Review Comment:
   Same as above.



##########
paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import java.util.function.Supplier;
+
+/** Metrics for writer. */
+public class WriterMetrics {
+
+    private static final String GROUP_NAME = "writer";
+
+    private static final int WINDOW_SAMPLE_SIZE = 10000;
+    private static final String WRITE_RECORD_NUM = "writeRecordCount";
+
+    private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+
+    private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";
+
+    private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";
+
+    public static final String WRITE_COST_MS = "writeCostMS";
+
+    private static final String FLUSH_COST_MS = "flushCostMS";
+
+    public static final String PREPARE_COMMIT_COST = "prepareCommitCostMS";
+
+    public static final String SYNC_LASTEST_COMPACTION_COST_MS = "syncLastestCompactionCostMS";
+
+    private final Counter writeRecordNumCounter;
+
+    private final Gauge<Long> memoryPreemptCount;
+
+    private final Gauge<Long> usedWriteBufferSizeGauge;
+
+    private final Gauge<Long> totalWriteBufferSizeGauge;
+
+    private final Histogram writeCostMS;
+
+    private final Histogram bufferFlushCostMS;
+
+    private final Histogram prepareCommitCostMS;
+
+    private final Histogram syncLastestCompactionCostMS;
+
+    private Stats stats;
+
+    public WriterMetrics(MetricRegistry registry, String tableName, String commitUser) {
+        stats = new Stats();
+        MetricGroup metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName, commitUser);
+        writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);
+
+        // buffer
+        memoryPreemptCount =
+                metricGroup.gauge(BUFFER_PREEMPT_COUNT, () -> stats.bufferPreemptCount.get());
+
+        usedWriteBufferSizeGauge =
+                metricGroup.gauge(USED_WRITE_BUFFER_SIZE, () -> stats.usedWriteBufferSize.get());
+
+        totalWriteBufferSizeGauge =
+                metricGroup.gauge(TOTAL_WRITE_BUFFER_SIZE, () -> stats.totalWriteBufferSize.get());
+
+        // cost
+        writeCostMS = metricGroup.histogram(WRITE_COST_MS, WINDOW_SAMPLE_SIZE);
+        bufferFlushCostMS = metricGroup.histogram(FLUSH_COST_MS, WINDOW_SAMPLE_SIZE);
+
+        // prepareCommittime
+        prepareCommitCostMS = metricGroup.histogram(PREPARE_COMMIT_COST, WINDOW_SAMPLE_SIZE);
+
+        syncLastestCompactionCostMS =
+                metricGroup.histogram(SYNC_LASTEST_COMPACTION_COST_MS, WINDOW_SAMPLE_SIZE);
+    }
+
+    public void incWriteRecordNum() {
+        writeRecordNumCounter.inc();
+    }
+
+    public void updateWriteCostMS(long bufferAppendCost) {
+        writeCostMS.update(bufferAppendCost);
+    }
+
+    public void updateBufferFlushCostMS(long bufferFlushCost) {
+        bufferFlushCostMS.update(bufferFlushCost);
+    }
+
+    public void updatePrepareCommitCostMS(long cost) {
+        this.prepareCommitCostMS.update(cost);
+    }
+
+    public void updateSyncLastestCompactionCostMS(long cost) {
+        this.syncLastestCompactionCostMS.update(cost);
+    }
+
+    public void setMemoryPreemptCount(Supplier<Long> bufferPreemptNumSupplier) {
+        this.stats.bufferPreemptCount = bufferPreemptNumSupplier;
+    }
+
+    public void setUsedWriteBufferSize(Supplier<Long> usedWriteBufferSize) {
+        this.stats.usedWriteBufferSize = usedWriteBufferSize;
+    }
+
+    public void setTotaldWriteBufferSize(Supplier<Long> totaldWriteBufferSize) {
+        this.stats.totalWriteBufferSize = totaldWriteBufferSize;
+    }

Review Comment:
   Call these methods directly from `MemoryPoolFactory` and we don't need to introduce the inner `Stats` class.



##########
paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java:
##########
@@ -27,10 +27,18 @@ public abstract class MetricRegistry {
     private static final String KEY_TABLE = "table";
     private static final String KEY_PARTITION = "partition";
     private static final String KEY_BUCKET = "bucket";
+    private static final String COMMIT_USER = "commit_user";
 
     public MetricGroup tableMetricGroup(String groupName, String tableName) {
+        return tableMetricGroup(groupName, tableName, null);
+    }
+
+    public MetricGroup tableMetricGroup(String groupName, String tableName, String commitUser) {
         Map<String, String> variables = new LinkedHashMap<>();
         variables.put(KEY_TABLE, tableName);
+        if (commitUser != null) {
+            variables.put(COMMIT_USER, commitUser);
+        }

Review Comment:
   Why adding `commitUser` as a metric group variable? For Flink jobs using job id is enough.



##########
paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java:
##########
@@ -281,8 +304,10 @@ private void updateCompactResult(CompactResult result) {
     }
 
     private void trySyncLatestCompaction(boolean blocking) throws Exception {
+        long start = System.currentTimeMillis();
         Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
         result.ifPresent(this::updateCompactResult);
+        writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);

Review Comment:
   Same as above.



##########
paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import java.util.function.Supplier;
+
+/** Metrics for writer. */
+public class WriterMetrics {
+
+    private static final String GROUP_NAME = "writer";
+
+    private static final int WINDOW_SAMPLE_SIZE = 10000;
+    private static final String WRITE_RECORD_NUM = "writeRecordCount";
+
+    private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+
+    private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";
+
+    private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";
+
+    public static final String WRITE_COST_MS = "writeCostMS";
+
+    private static final String FLUSH_COST_MS = "flushCostMS";
+
+    public static final String PREPARE_COMMIT_COST = "prepareCommitCostMS";
+
+    public static final String SYNC_LASTEST_COMPACTION_COST_MS = "syncLastestCompactionCostMS";
+
+    private final Counter writeRecordNumCounter;
+
+    private final Gauge<Long> memoryPreemptCount;
+
+    private final Gauge<Long> usedWriteBufferSizeGauge;
+
+    private final Gauge<Long> totalWriteBufferSizeGauge;
+
+    private final Histogram writeCostMS;
+
+    private final Histogram bufferFlushCostMS;
+
+    private final Histogram prepareCommitCostMS;
+
+    private final Histogram syncLastestCompactionCostMS;
+
+    private Stats stats;
+
+    public WriterMetrics(MetricRegistry registry, String tableName, String commitUser) {
+        stats = new Stats();
+        MetricGroup metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName, commitUser);
+        writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);
+
+        // buffer
+        memoryPreemptCount =
+                metricGroup.gauge(BUFFER_PREEMPT_COUNT, () -> stats.bufferPreemptCount.get());
+
+        usedWriteBufferSizeGauge =
+                metricGroup.gauge(USED_WRITE_BUFFER_SIZE, () -> stats.usedWriteBufferSize.get());
+
+        totalWriteBufferSizeGauge =
+                metricGroup.gauge(TOTAL_WRITE_BUFFER_SIZE, () -> stats.totalWriteBufferSize.get());
+
+        // cost
+        writeCostMS = metricGroup.histogram(WRITE_COST_MS, WINDOW_SAMPLE_SIZE);
+        bufferFlushCostMS = metricGroup.histogram(FLUSH_COST_MS, WINDOW_SAMPLE_SIZE);
+
+        // prepareCommittime
+        prepareCommitCostMS = metricGroup.histogram(PREPARE_COMMIT_COST, WINDOW_SAMPLE_SIZE);
+
+        syncLastestCompactionCostMS =
+                metricGroup.histogram(SYNC_LASTEST_COMPACTION_COST_MS, WINDOW_SAMPLE_SIZE);
+    }
+
+    public void incWriteRecordNum() {
+        writeRecordNumCounter.inc();
+    }
+
+    public void updateWriteCostMS(long bufferAppendCost) {
+        writeCostMS.update(bufferAppendCost);
+    }
+
+    public void updateBufferFlushCostMS(long bufferFlushCost) {
+        bufferFlushCostMS.update(bufferFlushCost);
+    }
+
+    public void updatePrepareCommitCostMS(long cost) {
+        this.prepareCommitCostMS.update(cost);

Review Comment:
   NIT: `prepareCommitCostMillis` or `prepareCommitCostMs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org