You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/19 03:21:59 UTC

[GitHub] [beam] ajamato commented on a change in pull request #12609: [BEAM-10699] Logging BigQuery streaming insert tail latencies

ajamato commented on a change in pull request #12609:
URL: https://github.com/apache/beam/pull/12609#discussion_r472624659



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {
+  private static final Logger LOG = LoggerFactory.getLogger(Histogram.class);
+
+  private static final int DEFAULT_NUM_OF_BUCKETS = 50;
+
+  private final double rangeFrom;
+  private final double rangeTo;
+  private final int numOfBuckets;

Review comment:
       nit: rename to numBuckets

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {
+  private static final Logger LOG = LoggerFactory.getLogger(Histogram.class);
+
+  private static final int DEFAULT_NUM_OF_BUCKETS = 50;
+
+  private final double rangeFrom;
+  private final double rangeTo;
+  private final int numOfBuckets;
+
+  private long[] buckets;
+  private final double bucketSize;
+  private long totalNumOfRecords;
+
+  private final boolean ignoreOutOfRangeRecord;
+
+  private Histogram(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {
+    if (rangeFrom < 0) {
+      throw new RuntimeException(String.format("only positive range allowed: %f", rangeFrom));
+    }
+    if (rangeFrom >= rangeTo) {
+      throw new RuntimeException(
+          String.format("rangeTo should be larger than rangeFrom: [%f, %f)", rangeFrom, rangeTo));
+    }
+    if (numOfBuckets <= 0) {
+      throw new RuntimeException(
+          String.format("numOfBuckets should be greater than zero: %d", numOfBuckets));
+    }
+    this.rangeFrom = rangeFrom;
+    this.rangeTo = rangeTo;
+    this.numOfBuckets = numOfBuckets;
+    this.ignoreOutOfRangeRecord = ignoreOutOfRangeRecord;
+    this.buckets = new long[numOfBuckets];
+    this.bucketSize = (rangeTo - rangeFrom) / numOfBuckets;
+    this.totalNumOfRecords = 0;
+  }
+
+  /**
+   * Create a histogram.
+   *
+   * @param rangeFrom The minimum value that this histogram can record. Cannot be negative.

Review comment:
       Why restrict this to not allow negatives? Any range of numbers for bucket boundaries is valid

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {
+  private static final Logger LOG = LoggerFactory.getLogger(Histogram.class);
+
+  private static final int DEFAULT_NUM_OF_BUCKETS = 50;
+
+  private final double rangeFrom;
+  private final double rangeTo;
+  private final int numOfBuckets;
+
+  private long[] buckets;
+  private final double bucketSize;
+  private long totalNumOfRecords;
+
+  private final boolean ignoreOutOfRangeRecord;
+
+  private Histogram(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {

Review comment:
       I think that we can remove the ignoreOutOfRangeRecord parameter. 
   
   Let the first and last bucket include up to +ve and -ve infinity
   i.e. for n bucket boundaries, the buckets are:
   
   (-INF, b0), [b0, b1), ... [bn, +INF)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {
+  private static final Logger LOG = LoggerFactory.getLogger(Histogram.class);
+
+  private static final int DEFAULT_NUM_OF_BUCKETS = 50;
+
+  private final double rangeFrom;
+  private final double rangeTo;
+  private final int numOfBuckets;
+
+  private long[] buckets;
+  private final double bucketSize;
+  private long totalNumOfRecords;
+
+  private final boolean ignoreOutOfRangeRecord;
+
+  private Histogram(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {
+    if (rangeFrom < 0) {
+      throw new RuntimeException(String.format("only positive range allowed: %f", rangeFrom));
+    }
+    if (rangeFrom >= rangeTo) {
+      throw new RuntimeException(
+          String.format("rangeTo should be larger than rangeFrom: [%f, %f)", rangeFrom, rangeTo));
+    }
+    if (numOfBuckets <= 0) {
+      throw new RuntimeException(
+          String.format("numOfBuckets should be greater than zero: %d", numOfBuckets));
+    }
+    this.rangeFrom = rangeFrom;
+    this.rangeTo = rangeTo;
+    this.numOfBuckets = numOfBuckets;
+    this.ignoreOutOfRangeRecord = ignoreOutOfRangeRecord;
+    this.buckets = new long[numOfBuckets];
+    this.bucketSize = (rangeTo - rangeFrom) / numOfBuckets;
+    this.totalNumOfRecords = 0;
+  }
+
+  /**
+   * Create a histogram.
+   *
+   * @param rangeFrom The minimum value that this histogram can record. Cannot be negative.
+   * @param rangeTo The maximum value that this histogram can record. Cannot be smaller than or
+   *     equal to rangeFrom.
+   * @param numOfBuckets The number of buckets. Larger number of buckets implies a better resolution
+   *     for percentile estimation.
+   * @param ignoreOutOfRangeRecord Whether the out-of-range records are discarded. It will throw
+   *     RuntimeException for the out-of-range records if this is set to false.
+   * @return a new Histogram instance.
+   */
+  public static Histogram of(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {

Review comment:
       Would you mind please re-doing these parameters so that they represent a linear function. i.e.
   bucket boundaries are define by: 
   
   bi = width * x + start
   for n bucket boundaries, the buckets are:
   (-INF, b0), [b0, b1), ... [bn, +INF)
   
   public static Histogram of(
         double startBounds, double bucketWidth, int numOfBuckets, boolean ignoreOutOfRangeRecord)
   
   This was one of the histogram bucket definition styles we considered here:
   https://docs.google.com/document/d/1kiNG2BAR-51pRdBCK4-XFmc0WuIkSuBzeb__Zv8owbU/edit#heading=h.279ygg7gw109
   
   I'll enhance this later to add constructors for using exponential and explicit buckets.

##########
File path: sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/LoggingHttpRequestInitializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import com.google.api.client.http.HttpExecuteInterceptor;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import java.io.IOException;
+import org.apache.beam.sdk.util.Histogram;
+
+public class LoggingHttpRequestInitializer implements HttpRequestInitializer {

Review comment:
       Add a comment describing the purpose of this callls, to record the latency of Http based API calls. I.e. for BigQuery IO

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
##########
@@ -125,6 +148,20 @@ public void finishBundle(FinishBundleContext context) throws Exception {
     for (ValueInSingleWindow<ErrorT> row : failedInserts) {
       context.output(failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow());
     }
+
+    if (histogram.getTotalCount() > options.getLatencyLoggingFrequency()) {

Review comment:
       Any reason to suspect that this will not log, if we fail a request snd don't attempt further requests? A logger on a periodic interval might make more sense?

##########
File path: sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/LoggingHttpRequestInitializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import com.google.api.client.http.HttpExecuteInterceptor;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import java.io.IOException;
+import org.apache.beam.sdk.util.Histogram;
+

Review comment:
       Please rename to LatencyRecordingHttpRequestInitializer (So that we can possibly update it to use metrics instead of logging)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {

Review comment:
       Personally, I think this Histogram class is sufficient for now, and we should proceed with this for now.
   There may be some arguments to use a histogram library in the future. I.e. to support sparsely populated histograms.
   Please add a TODO to consider implementing with other potential options, such as.
   
   Apache Commons
   https://www.baeldung.com/apache-commons-frequency
   
   GitHub Hdr Histogram
   https://github.com/HdrHistogram/HdrHistogram

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
##########
@@ -68,6 +69,11 @@ public DatasetService getDatasetService(BigQueryOptions bqOptions) {
     return datasetService;
   }
 
+  @Override
+  public DatasetService getDatasetService(BigQueryOptions bqOptions, Histogram histogram) {

Review comment:
       I don't think we should be passing in the histogram to the factory for building the service.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/Histogram.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.math.RoundingMode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A histogram that supports estimated percentile with linear interpolation. */
+public class Histogram {
+  private static final Logger LOG = LoggerFactory.getLogger(Histogram.class);
+
+  private static final int DEFAULT_NUM_OF_BUCKETS = 50;
+
+  private final double rangeFrom;
+  private final double rangeTo;
+  private final int numOfBuckets;
+
+  private long[] buckets;
+  private final double bucketSize;
+  private long totalNumOfRecords;
+
+  private final boolean ignoreOutOfRangeRecord;
+
+  private Histogram(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {
+    if (rangeFrom < 0) {
+      throw new RuntimeException(String.format("only positive range allowed: %f", rangeFrom));
+    }
+    if (rangeFrom >= rangeTo) {
+      throw new RuntimeException(
+          String.format("rangeTo should be larger than rangeFrom: [%f, %f)", rangeFrom, rangeTo));
+    }
+    if (numOfBuckets <= 0) {
+      throw new RuntimeException(
+          String.format("numOfBuckets should be greater than zero: %d", numOfBuckets));
+    }
+    this.rangeFrom = rangeFrom;
+    this.rangeTo = rangeTo;
+    this.numOfBuckets = numOfBuckets;
+    this.ignoreOutOfRangeRecord = ignoreOutOfRangeRecord;
+    this.buckets = new long[numOfBuckets];
+    this.bucketSize = (rangeTo - rangeFrom) / numOfBuckets;
+    this.totalNumOfRecords = 0;
+  }
+
+  /**
+   * Create a histogram.
+   *
+   * @param rangeFrom The minimum value that this histogram can record. Cannot be negative.
+   * @param rangeTo The maximum value that this histogram can record. Cannot be smaller than or
+   *     equal to rangeFrom.
+   * @param numOfBuckets The number of buckets. Larger number of buckets implies a better resolution
+   *     for percentile estimation.
+   * @param ignoreOutOfRangeRecord Whether the out-of-range records are discarded. It will throw
+   *     RuntimeException for the out-of-range records if this is set to false.
+   * @return a new Histogram instance.
+   */
+  public static Histogram of(
+      double rangeFrom, double rangeTo, int numOfBuckets, boolean ignoreOutOfRangeRecord) {
+    return new Histogram(rangeFrom, rangeTo, numOfBuckets, ignoreOutOfRangeRecord);
+  }
+
+  public static Histogram of(double rangeFrom, double rangeTo) {
+    return new Histogram(rangeFrom, rangeTo, DEFAULT_NUM_OF_BUCKETS, false);
+  }
+
+  public void record(double... values) {
+    for (double value : values) {
+      record(value);
+    }
+  }
+
+  public synchronized void clear() {
+    this.buckets = new long[numOfBuckets];
+    this.totalNumOfRecords = 0;
+  }
+
+  public synchronized void record(double value) {
+    if (value >= rangeTo || value < rangeFrom) {

Review comment:
       Please place values in the first or last bucket, inlcuding values up to  -INF and +INF

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
##########
@@ -80,6 +89,20 @@
     this.toTableRow = toTableRow;
   }
 
+  @Setup
+  public void setup() {
+    // record latency upto 30 seconds in the resolution of 20ms
+    histogram = Histogram.of(0, 30000, 1500, true);

Review comment:
       30s seems a bit small. What is the timeout set to? I recommend making the maximum at least another 30s or so larger than the timeout.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -131,7 +133,12 @@ public JobService getJobService(BigQueryOptions options) {
 
   @Override
   public DatasetService getDatasetService(BigQueryOptions options) {
-    return new DatasetServiceImpl(options);
+    return new DatasetServiceImpl(options, null);
+  }
+
+  @Override
+  public DatasetService getDatasetService(BigQueryOptions options, Histogram histogram) {

Review comment:
       I don't think we should be passing in the histogram to the factory for building the service. Couldn't we just instantiate it inside the call to newBigQueryClient()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org