You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/01 17:25:21 UTC
[geode] branch support/1.14 updated: GEODE-8965: Support
Redis-style OOM error message (#6085) (#6213)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new b58bfc5 GEODE-8965: Support Redis-style OOM error message (#6085) (#6213)
b58bfc5 is described below
commit b58bfc50e70973ef56daff95bed8c5bbfc02d43c
Author: Ray Ingles <ri...@pivotal.io>
AuthorDate: Thu Apr 1 13:23:57 2021 -0400
GEODE-8965: Support Redis-style OOM error message (#6085) (#6213)
- Allow execution of Redis commands in low-memory conditions
- Test with large key size to guarantee OOM error
- In tests use multiple key sizes to pack used memory more tightly
- force multiple GCs as memory is filled, add memory-pressure thread in tests
- Add memory pressure to del and expire tests
Co-authored-by: Ray Ingles <ri...@vmware.com>
(cherry picked from commit 08da3aaa1dd3dd5dd212353ecead32819e584c62)
---
.../internal/cache/control/HeapMemoryMonitor.java | 3 +-
.../cache/execute/AllowExecutionInLowMemory.java | 23 ++
.../apache/geode/redis/OutOfMemoryDUnitTest.java | 237 +++++++++++++++++++++
.../geode/redis/internal/RedisConstants.java | 2 +
.../redis/internal/executor/RedisResponse.java | 4 +
.../executor/SingleResultRedisFunction.java | 4 +-
.../internal/netty/ExecutionHandlerContext.java | 3 +
7 files changed, 273 insertions(+), 3 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index 70ded46..72d07bd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
+import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory;
import org.apache.geode.internal.statistics.GemFireStatSampler;
import org.apache.geode.internal.statistics.LocalStatListener;
import org.apache.geode.internal.statistics.StatisticsManager;
@@ -710,7 +711,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
public LowMemoryException createLowMemoryIfNeeded(Function function,
Set<? extends DistributedMember> memberSet) {
- if (function.optimizeForWrite()
+ if (function.optimizeForWrite() && !(function instanceof AllowExecutionInLowMemory)
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
if (!criticalMembersFrom.isEmpty()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java
new file mode 100644
index 0000000..a5dadb6
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+
+/**
+ * An internal marker interface used to allow functions to run in low-memory conditions.
+ */
+public interface AllowExecutionInLowMemory extends InternalFunction<Object[]> {
+
+}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java
new file mode 100644
index 0000000..cd1677e
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class OutOfMemoryDUnitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ private static final String expectedEx = "Member: .*? above .*? critical threshold";
+ private static final String FILLER_KEY = "fillerKey-";
+ private static final String PRESSURE_KEY = "pressureKey-";
+ private static final String LOCAL_HOST = "127.0.0.1";
+ private static final int KEY_TTL_SECONDS = 10;
+ private static final int MAX_ITERATION_COUNT = 4000;
+ private static final int LARGE_VALUE_SIZE = 128 * 1024;
+ private static final int PRESSURE_VALUE_SIZE = 4 * 1024;
+ private static final int JEDIS_TIMEOUT =
+ Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+ private static Jedis jedis1;
+ private static Jedis jedis2;
+
+ private static MemberVM server1;
+ private static MemberVM server2;
+
+ private static Thread memoryPressureThread;
+
+ @BeforeClass
+ public static void classSetup() {
+ IgnoredException.addIgnoredException(expectedEx);
+
+ MemberVM locator = clusterStartUp.startLocatorVM(0);
+
+ Properties serverProperties = new Properties();
+ server1 = clusterStartUp.startRedisVM(1, serverProperties, locator.getPort());
+ server2 = clusterStartUp.startRedisVM(2, serverProperties, locator.getPort());
+
+ server1.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager()
+ .setCriticalHeapPercentage(5.0F));
+ server2.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager()
+ .setCriticalHeapPercentage(5.0F));
+
+ int redisServerPort1 = clusterStartUp.getRedisPort(1);
+ int redisServerPort2 = clusterStartUp.getRedisPort(2);
+
+ jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+ jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+ }
+
+ @Before
+ public void testSetup() {
+ jedis1.flushAll();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ jedis1.disconnect();
+ jedis2.disconnect();
+
+ server1.stop();
+ server2.stop();
+ }
+
+ @Test
+ public void shouldReturnOOMError_forWriteOperations_whenThresholdReached()
+ throws InterruptedException {
+ IgnoredException.addIgnoredException(expectedEx);
+ IgnoredException.addIgnoredException("LowMemoryException");
+
+ memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+ memoryPressureThread.start();
+
+ fillMemory(jedis2, false);
+
+ assertThatThrownBy(() -> jedis2.set("oneMoreKey", makeLongStringValue(2 * LARGE_VALUE_SIZE)))
+ .hasMessageContaining("OOM");
+
+ memoryPressureThread.interrupt();
+ memoryPressureThread.join();
+ }
+
+ @Test
+ public void shouldAllowDeleteOperations_afterThresholdReached() throws InterruptedException {
+ IgnoredException.addIgnoredException(expectedEx);
+ IgnoredException.addIgnoredException("LowMemoryException");
+
+ memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+ memoryPressureThread.start();
+
+ fillMemory(jedis2, false);
+
+ assertThatNoException().isThrownBy(() -> jedis2.del(FILLER_KEY + 1));
+
+ memoryPressureThread.interrupt();
+ memoryPressureThread.join();
+ }
+
+ @Test
+ public void shouldAllowExpiration_afterThresholdReached() throws InterruptedException {
+ IgnoredException.addIgnoredException(expectedEx);
+ IgnoredException.addIgnoredException("LowMemoryException");
+
+ memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+ memoryPressureThread.start();
+
+ fillMemory(jedis2, true);
+
+ await().untilAsserted(() -> {
+ assertThat(jedis2.ttl(FILLER_KEY + 1)).isEqualTo(-2);
+ });
+
+ memoryPressureThread.interrupt();
+ memoryPressureThread.join();
+ }
+
+ // TODO: test that write operations become allowed after memory has dropped
+ // below critical levels. Difficult to do right now because of vagaries of the
+ // Java garbage collector.
+
+ private void fillMemory(Jedis jedis, boolean withExpiration) {
+ String valueString;
+ int valueSize = LARGE_VALUE_SIZE;
+
+ while (valueSize > 1) {
+ forceGC(); // Helps ensure we really do fill all available memory
+ valueString = makeLongStringValue(LARGE_VALUE_SIZE);
+ addMultipleKeys(jedis, valueString, withExpiration);
+ valueSize /= 2;
+ }
+ }
+
+ private void addMultipleKeys(Jedis jedis, String valueString, boolean withExpiration) {
+ // count is final because it is never reassigned
+ AtomicInteger count = new AtomicInteger();
+
+ Throwable thrown = catchThrowable(() -> {
+ for (count.set(0); count.get() < MAX_ITERATION_COUNT; count.incrementAndGet()) {
+ setRedisKeyAndValue(jedis, withExpiration, valueString, count.get());
+ }
+ });
+
+ assertThat(thrown)
+ .isInstanceOf(Exception.class)
+ .hasMessageContaining("OOM command not allowed");
+
+ assertThat(count.get()).isLessThan(MAX_ITERATION_COUNT);
+ }
+
+ private void setRedisKeyAndValue(Jedis jedis, boolean withExpiration, String valueString,
+ int keyNumber) {
+ if (withExpiration) {
+ jedis.setex(FILLER_KEY + keyNumber, KEY_TTL_SECONDS, valueString);
+ } else {
+ jedis.set(FILLER_KEY + keyNumber, valueString);
+ }
+ }
+
+ private static String makeLongStringValue(int requestedSize) {
+ char[] largeCharData = new char[requestedSize];
+ Arrays.fill(largeCharData, 'a');
+ return new String(largeCharData);
+ }
+
+ private static Runnable makeMemoryPressureRunnable() {
+ return new Runnable() {
+ boolean running = true;
+ String pressureValue = makeLongStringValue(PRESSURE_VALUE_SIZE);
+
+ @Override
+ public void run() {
+ int i = 0;
+ while (running) {
+ if (Thread.currentThread().isInterrupted()) {
+ running = false;
+ break;
+ }
+ try {
+ jedis1.set(PRESSURE_KEY + i, pressureValue);
+ } catch (JedisException je) {
+ // Ignore, keep trying to fill memory
+ }
+ i++;
+ }
+ }
+ };
+ }
+
+ private void forceGC() {
+ server1.getVM().invoke(() -> {
+ Runtime.getRuntime().gc();
+ });
+ server2.getVM().invoke(() -> {
+ Runtime.getRuntime().gc();
+ });
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index 0a86362..313969c 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -49,6 +49,8 @@ public class RedisConstants {
public static final String ERROR_SYNTAX = "syntax error";
public static final String ERROR_INVALID_EXPIRE_TIME = "invalid expire time in set";
public static final String ERROR_NOT_A_VALID_FLOAT = "value is not a valid float";
+ public static final String ERROR_OOM_COMMAND_NOT_ALLOWED =
+ "command not allowed when used memory > 'maxmemory'";
public static final String ERROR_UNKNOWN_SLOWLOG_SUBCOMMAND =
"Unknown subcommand or wrong number of arguments for '%s'. Try SLOWLOG HELP.";
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index f6e909e..44c9f2d 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -121,6 +121,10 @@ public class RedisResponse {
return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error));
}
+ public static RedisResponse oom(String error) {
+ return new RedisResponse((bba) -> Coder.getOOMResponse(bba, error));
+ }
+
public static RedisResponse customError(String error) {
return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error));
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
index ed1ae1a..5cbf5e7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
@@ -18,12 +18,12 @@ package org.apache.geode.redis.internal.executor;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory;
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisKey;
-public abstract class SingleResultRedisFunction implements InternalFunction<Object[]> {
+public abstract class SingleResultRedisFunction implements AllowExecutionInLowMemory {
private static final long serialVersionUID = 3239452234149879302L;
private final transient PartitionedRegion partitionedRegion;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 4ca7359..3c9056b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -233,6 +234,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
response = RedisResponse.error(cause.getMessage());
} else if (cause instanceof RedisDataTypeMismatchException) {
response = RedisResponse.wrongType(cause.getMessage());
+ } else if (cause instanceof LowMemoryException) {
+ response = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
} else if (cause instanceof DecoderException
&& cause.getCause() instanceof RedisCommandParserException) {
response = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);