You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/13 22:48:44 UTC
[beam] branch master updated: Add ability for HotKeyLogger to log a
key.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e7e2d31 Add ability for HotKeyLogger to log a key.
new 7d150b0 Merge pull request #12984 from rohdesamuel/beam-10994-shared-2
e7e2d31 is described below
commit e7e2d31bb1e4dcb6f259d48428271c57404d3a30
Author: Sam R <ro...@gmail.com>
AuthorDate: Wed Sep 30 13:42:47 2020 -0700
Add ability for HotKeyLogger to log a key.
---
.../beam/runners/dataflow/worker/HotKeyLogger.java | 31 ++++++----
.../runners/dataflow/worker/HotKeyLoggerTest.java | 68 +++++++++++-----------
2 files changed, 55 insertions(+), 44 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
index 473c89b..ad3a882 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.dataflow.worker;
import com.google.api.client.util.Clock;
-import java.text.MessageFormat;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -50,7 +49,27 @@ public class HotKeyLogger {
if (isThrottled()) {
return;
}
- LOG.warn(getHotKeyMessage(userStepName, TimeUtil.toCloudDuration(hotKeyAge)));
+ LOG.warn(
+ "A hot key was detected in step '{}' with age of '{}'. This is "
+ + "a symptom of key distribution being skewed. To fix, please inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across your key space.",
+ userStepName,
+ TimeUtil.toCloudDuration(hotKeyAge));
+ }
+
+ /** Logs a detection of the hot key every 5 minutes with the given key. */
+ public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object hotkey) {
+ if (isThrottled()) {
+ return;
+ }
+
+ LOG.warn(
+ "A hot key '{}' was detected in step '{}' with age of '{}'. This is "
+ + "a symptom of key distribution being skewed. To fix, please inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across your key space.",
+ hotkey,
+ userStepName,
+ TimeUtil.toCloudDuration(hotKeyAge));
}
/**
@@ -66,12 +85,4 @@ public class HotKeyLogger {
prevHotKeyDetectionLogMs = nowMs;
return false;
}
-
- protected String getHotKeyMessage(String userStepName, String hotKeyAge) {
- return MessageFormat.format(
- "A hot key was detected in step ''{0}'' with age of ''{1}''. This is"
- + " a symptom of key distribution being skewed. To fix, please inspect your data and "
- + "pipeline to ensure that elements are evenly distributed across your key space.",
- userStepName, hotKeyAge);
- }
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
index 0f20584..1049f2b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java
@@ -17,31 +17,26 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.when;
import com.google.api.client.testing.http.FixedClock;
import com.google.api.client.util.Clock;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.joda.time.Duration;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({HotKeyLoggerTest.class, LoggerFactory.class})
public class HotKeyLoggerTest {
+ @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(HotKeyLogger.class);
+
private FixedClock clock;
@Before
@@ -50,18 +45,42 @@ public class HotKeyLoggerTest {
}
@Test
- public void correctHotKeyMessage() {
+ public void correctHotKeyMessageWithoutKey() {
HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
- assertFalse(hotKeyLogger.isThrottled());
- String m = hotKeyLogger.getHotKeyMessage("TEST_STEP_ID", "1s");
+ hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1));
assertTrue(hotKeyLogger.isThrottled());
- assertEquals(
+ expectedLogs.verifyWarn(
"A hot key was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
+ "symptom of key distribution being skewed. To fix, please inspect your data and "
- + "pipeline to ensure that elements are evenly distributed across your key space.",
- m);
+ + "pipeline to ensure that elements are evenly distributed across your key space.");
+ }
+
+ @Test
+ public void correctHotKeyMessageWithKey() {
+ HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
+
+ hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1), "my key");
+ assertTrue(hotKeyLogger.isThrottled());
+
+ expectedLogs.verifyWarn(
+ "A hot key 'my key' was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
+ + "symptom of key distribution being skewed. To fix, please inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across your key space.");
+ }
+
+ @Test
+ public void correctHotKeyMessageWithNullKey() {
+ HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
+
+ hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardSeconds(1), null);
+ assertTrue(hotKeyLogger.isThrottled());
+
+ expectedLogs.verifyWarn(
+ "A hot key 'null' was detected in step 'TEST_STEP_ID' with age of '1s'. This is a "
+ + "symptom of key distribution being skewed. To fix, please inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across your key space.");
}
@Test
@@ -83,23 +102,4 @@ public class HotKeyLoggerTest {
assertFalse(hotKeyLogger.isThrottled());
assertTrue(hotKeyLogger.isThrottled());
}
-
- @Test
- @SuppressWarnings("Slf4jIllegalPassedClass")
- public void logsHotKeyMessage() {
- mockStatic(LoggerFactory.class);
- Logger logger = mock(Logger.class);
- when(LoggerFactory.getLogger(any(Class.class))).thenReturn(logger);
-
- HotKeyLogger hotKeyLogger = new HotKeyLogger(clock);
-
- // Logs because not throttled.
- hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardHours(1));
-
- // Does not log because throttled.
- hotKeyLogger.logHotKeyDetection("TEST_STEP_ID", Duration.standardHours(1));
-
- // Only logs once because of throttling.
- verify(logger, Mockito.times(1)).warn(anyString());
- }
}