You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/06 01:32:13 UTC

[GitHub] [beam] hengfengli commented on a diff in pull request #17200: [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector

hengfengli commented on code in PR #17200:
URL: https://github.com/apache/beam/pull/17200#discussion_r843386991


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** An estimator to provide an estimate on the throughput of the outputted elements. */
+public class ThroughputEstimator implements Serializable {
+
+  private static class Pair<K, V> {
+    private final K first;
+    private final V second;
+
+    public Pair(K first, V second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    public K getFirst() {
+      return first;
+    }
+
+    public V getSecond() {
+      return second;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("first: %s, second: %s", first, second);
+    }
+  }
+
+  private static final long serialVersionUID = -3597929310338724800L;
+  // The start time of each per-second window.
+  private Timestamp startTimeOfCurrentWindow;
+  // The bytes of the current window.
+  private double bytesInCurrentWindow;
+  // The number of seconds to look in the past.
+  private final int numOfSeconds = 60;
+  // The total bytes of all windows in the queue.
+  private double bytesInQueue;
+  // The queue holds a number of windows in the past in order to calculate
+  // a rolling windowing throughput.
+  private Queue<Pair<Timestamp, Double>> queue;
+
+  public ThroughputEstimator() {
+    queue = new ArrayDeque<>();
+  }
+
+  /**
+   * Updates the estimator with the bytes of records.
+   *
+   * @param timeOfRecords the committed timestamp of the records
+   * @param bytes the total bytes of the records
+   */
+  public void update(Timestamp timeOfRecords, double bytes) {
+    if (startTimeOfCurrentWindow == null) {
+      bytesInCurrentWindow = bytes;
+      startTimeOfCurrentWindow = timeOfRecords;
+      return;
+    }
+
+    if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 1) {

Review Comment:
   You are asking `What if the timeOfRecords == startTimeOfCurrentWindow?`. 
   
   Did you miss the `+1` at the end? If timeOfRecords == startTimeOfCurrentWindow, `timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 1` would be met and it will not go to `else` branch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org