You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/13 22:48:44 UTC

[beam] branch master updated: Add ability for HotKeyLogger to log a key.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e2d31  Add ability for HotKeyLogger to log a key.
     new 7d150b0  Merge pull request #12984 from rohdesamuel/beam-10994-shared-2
e7e2d31 is described below

commit e7e2d31bb1e4dcb6f259d48428271c57404d3a30
Author: Sam R <ro...@gmail.com>
AuthorDate: Wed Sep 30 13:42:47 2020 -0700

    Add ability for HotKeyLogger to log a key.
---
 .../beam/runners/dataflow/worker/HotKeyLogger.java | 31 ++++++----
 .../runners/dataflow/worker/HotKeyLoggerTest.java  | 68 +++++++++++-----------
 2 files changed, 55 insertions(+), 44 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
index 473c89b..ad3a882 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.client.util.Clock;
-import java.text.MessageFormat;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -50,7 +49,27 @@ public class HotKeyLogger {
     if (isThrottled()) {
       return;
     }
-    LOG.warn(getHotKeyMessage(userStepName, TimeUtil.toCloudDuration(hotKeyAge)));
+    LOG.warn(
+        "A hot key was detected in step '{}' with age of '{}'. This is "
+            + "a symptom of key distribution being skewed. To fix, please inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across your key space.",
+        userStepName,
+        TimeUtil.toCloudDuration(hotKeyAge));
+  }
+
+  /** Logs a detection of the hot key every 5 minutes with the given key. */
+  public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object hotkey) {
+    if (isThrottled()) {
+      return;
+    }
+
+    LOG.warn(
+        "A hot key '{}' was detected in step '{}' with age of '{}'. This is "
+            + "a symptom of key distribution being skewed. To fix, please inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across your key space.",
+        hotkey,
+        userStepName,
+        TimeUtil.toCloudDuration(hotKeyAge));
   }
 
   /**
@@ -66,12 +85,4 @@ public class HotKeyLogger {
     prevHotKeyDetectionLogMs = nowMs;
     return false;
   }
-
-  protected String getHotKeyMessage(String userStepName, String hotKeyAge) {
-    return MessageFormat.format(
-        "A hot key was detected in step ''{0}'' with age of ''{1}''. This is"
-            + " a symptom of key distribution being skewed. To fix, please inspect your data and "
-            + "pipeline to ensure that elements are evenly distributed across your key space.",
-        userStepName, hotKeyAge);
-  }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
index 0f20584..1049f2b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
@@ -17,31 +17,26 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 import com.google.api.client.testing.http.FixedClock;
 import com.google.api.client.util.Clock;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.joda.time.Duration;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({HotKeyLoggerTest.class, LoggerFactory.class})
 public class HotKeyLoggerTest {
+  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(HotKeyLogger.class);
+
   private FixedClock clock;
 
   @Before
@@ -50,18 +45,42 @@ public class HotKeyLoggerTest {
   }
 
   @Test
-  public void correctHotKeyMessage() {
+  public void correctHotKeyMessageWithoutKey() {
     HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
 
-    assertFalse(hotKeyLogger.isThrottled());
-    String m = hotKeyLogger.getHotKeyMessage("TEST_STEP_ID", "1s");
+    hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1));
     assertTrue(hotKeyLogger.isThrottled());
 
-    assertEquals(
+    expectedLogs.verifyWarn(
         "A hot key was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
             + "symptom of key distribution being skewed. To fix, please inspect your data and "
-            + "pipeline to ensure that elements are evenly distributed across your key space.",
-        m);
+            + "pipeline to ensure that elements are evenly distributed across your key space.");
+  }
+
+  @Test
+  public void correctHotKeyMessageWithKey() {
+    HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
+
+    hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1), "my key");
+    assertTrue(hotKeyLogger.isThrottled());
+
+    expectedLogs.verifyWarn(
+        "A hot key 'my key' was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
+            + "symptom of key distribution being skewed. To fix, please inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across your key space.");
+  }
+
+  @Test
+  public void correctHotKeyMessageWithNullKey() {
+    HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
+
+    hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1), null);
+    assertTrue(hotKeyLogger.isThrottled());
+
+    expectedLogs.verifyWarn(
+        "A hot key 'null' was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
+            + "symptom of key distribution being skewed. To fix, please inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across your key space.");
   }
 
   @Test
@@ -83,23 +102,4 @@ public class HotKeyLoggerTest {
     assertFalse(hotKeyLogger.isThrottled());
     assertTrue(hotKeyLogger.isThrottled());
   }
-
-  @Test
-  @SuppressWarnings("Slf4jIllegalPassedClass")
-  public void logsHotKeyMessage() {
-    mockStatic(LoggerFactory.class);
-    Logger logger = mock(Logger.class);
-    when(LoggerFactory.getLogger(any(Class.class))).thenReturn(logger);
-
-    HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
-
-    // Logs because not throttled.
-    hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardHours(1));
-
-    // Does not log because throttled.
-    hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardHours(1));
-
-    // Only logs once because of throttling.
-    verify(logger, Mockito.times(1)).warn(anyString());
-  }
 }