You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/01 17:25:21 UTC

[geode] branch support/1.14 updated: GEODE-8965: Support Redis-style OOM error message (#6085) (#6213)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new b58bfc5  GEODE-8965:  Support Redis-style OOM error message (#6085) (#6213)
b58bfc5 is described below

commit b58bfc50e70973ef56daff95bed8c5bbfc02d43c
Author: Ray Ingles <ri...@pivotal.io>
AuthorDate: Thu Apr 1 13:23:57 2021 -0400

    GEODE-8965:  Support Redis-style OOM error message (#6085) (#6213)
    
    - Allow execution of Redis commands in low-memory conditions
    - Test with large key size to guarantee OOM error
    - In tests use multiple key sizes to pack used memory more tightly
    - force multiple GCs as memory is filled, add memory-pressure thread in tests
    - Add memory pressure to del and expire tests
    
    Co-authored-by: Ray Ingles <ri...@vmware.com>
    (cherry picked from commit 08da3aaa1dd3dd5dd212353ecead32819e584c62)
---
 .../internal/cache/control/HeapMemoryMonitor.java  |   3 +-
 .../cache/execute/AllowExecutionInLowMemory.java   |  23 ++
 .../apache/geode/redis/OutOfMemoryDUnitTest.java   | 237 +++++++++++++++++++++
 .../geode/redis/internal/RedisConstants.java       |   2 +
 .../redis/internal/executor/RedisResponse.java     |   4 +
 .../executor/SingleResultRedisFunction.java        |   4 +-
 .../internal/netty/ExecutionHandlerContext.java    |   3 +
 7 files changed, 273 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index 70ded46..72d07bd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
 import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
+import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory;
 import org.apache.geode.internal.statistics.GemFireStatSampler;
 import org.apache.geode.internal.statistics.LocalStatListener;
 import org.apache.geode.internal.statistics.StatisticsManager;
@@ -710,7 +711,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
 
   public LowMemoryException createLowMemoryIfNeeded(Function function,
       Set<? extends DistributedMember> memberSet) {
-    if (function.optimizeForWrite()
+    if (function.optimizeForWrite() && !(function instanceof AllowExecutionInLowMemory)
         && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
       Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
       if (!criticalMembersFrom.isEmpty()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java
new file mode 100644
index 0000000..a5dadb6
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+
+/**
+ * An internal marker interface used to allow functions to run in low-memory conditions.
+ */
+public interface AllowExecutionInLowMemory extends InternalFunction<Object[]> {
+
+}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java
new file mode 100644
index 0000000..cd1677e
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class OutOfMemoryDUnitTest {
+
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static final String expectedEx = "Member: .*? above .*? critical threshold";
+  private static final String FILLER_KEY = "fillerKey-";
+  private static final String PRESSURE_KEY = "pressureKey-";
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int KEY_TTL_SECONDS = 10;
+  private static final int MAX_ITERATION_COUNT = 4000;
+  private static final int LARGE_VALUE_SIZE = 128 * 1024;
+  private static final int PRESSURE_VALUE_SIZE = 4 * 1024;
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+
+  private static MemberVM server1;
+  private static MemberVM server2;
+
+  private static Thread memoryPressureThread;
+
+  @BeforeClass
+  public static void classSetup() {
+    IgnoredException.addIgnoredException(expectedEx);
+
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+
+    Properties serverProperties = new Properties();
+    server1 = clusterStartUp.startRedisVM(1, serverProperties, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, serverProperties, locator.getPort());
+
+    server1.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager()
+        .setCriticalHeapPercentage(5.0F));
+    server2.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager()
+        .setCriticalHeapPercentage(5.0F));
+
+    int redisServerPort1 = clusterStartUp.getRedisPort(1);
+    int redisServerPort2 = clusterStartUp.getRedisPort(2);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis1.disconnect();
+    jedis2.disconnect();
+
+    server1.stop();
+    server2.stop();
+  }
+
+  @Test
+  public void shouldReturnOOMError_forWriteOperations_whenThresholdReached()
+      throws InterruptedException {
+    IgnoredException.addIgnoredException(expectedEx);
+    IgnoredException.addIgnoredException("LowMemoryException");
+
+    memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+    memoryPressureThread.start();
+
+    fillMemory(jedis2, false);
+
+    assertThatThrownBy(() -> jedis2.set("oneMoreKey", makeLongStringValue(2 * LARGE_VALUE_SIZE)))
+        .hasMessageContaining("OOM");
+
+    memoryPressureThread.interrupt();
+    memoryPressureThread.join();
+  }
+
+  @Test
+  public void shouldAllowDeleteOperations_afterThresholdReached() throws InterruptedException {
+    IgnoredException.addIgnoredException(expectedEx);
+    IgnoredException.addIgnoredException("LowMemoryException");
+
+    memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+    memoryPressureThread.start();
+
+    fillMemory(jedis2, false);
+
+    assertThatNoException().isThrownBy(() -> jedis2.del(FILLER_KEY + 1));
+
+    memoryPressureThread.interrupt();
+    memoryPressureThread.join();
+  }
+
+  @Test
+  public void shouldAllowExpiration_afterThresholdReached() throws InterruptedException {
+    IgnoredException.addIgnoredException(expectedEx);
+    IgnoredException.addIgnoredException("LowMemoryException");
+
+    memoryPressureThread = new Thread(makeMemoryPressureRunnable());
+    memoryPressureThread.start();
+
+    fillMemory(jedis2, true);
+
+    await().untilAsserted(() -> {
+      assertThat(jedis2.ttl(FILLER_KEY + 1)).isEqualTo(-2);
+    });
+
+    memoryPressureThread.interrupt();
+    memoryPressureThread.join();
+  }
+
+  // TODO: test that write operations become allowed after memory has dropped
+  // below critical levels. Difficult to do right now because of vagaries of the
+  // Java garbage collector.
+
+  private void fillMemory(Jedis jedis, boolean withExpiration) {
+    String valueString;
+    int valueSize = LARGE_VALUE_SIZE;
+
+    while (valueSize > 1) {
+      forceGC(); // Helps ensure we really do fill all available memory
+      valueString = makeLongStringValue(LARGE_VALUE_SIZE);
+      addMultipleKeys(jedis, valueString, withExpiration);
+      valueSize /= 2;
+    }
+  }
+
+  private void addMultipleKeys(Jedis jedis, String valueString, boolean withExpiration) {
+    // count is final because it is never reassigned
+    AtomicInteger count = new AtomicInteger();
+
+    Throwable thrown = catchThrowable(() -> {
+      for (count.set(0); count.get() < MAX_ITERATION_COUNT; count.incrementAndGet()) {
+        setRedisKeyAndValue(jedis, withExpiration, valueString, count.get());
+      }
+    });
+
+    assertThat(thrown)
+        .isInstanceOf(Exception.class)
+        .hasMessageContaining("OOM command not allowed");
+
+    assertThat(count.get()).isLessThan(MAX_ITERATION_COUNT);
+  }
+
+  private void setRedisKeyAndValue(Jedis jedis, boolean withExpiration, String valueString,
+      int keyNumber) {
+    if (withExpiration) {
+      jedis.setex(FILLER_KEY + keyNumber, KEY_TTL_SECONDS, valueString);
+    } else {
+      jedis.set(FILLER_KEY + keyNumber, valueString);
+    }
+  }
+
+  private static String makeLongStringValue(int requestedSize) {
+    char[] largeCharData = new char[requestedSize];
+    Arrays.fill(largeCharData, 'a');
+    return new String(largeCharData);
+  }
+
+  private static Runnable makeMemoryPressureRunnable() {
+    return new Runnable() {
+      boolean running = true;
+      String pressureValue = makeLongStringValue(PRESSURE_VALUE_SIZE);
+
+      @Override
+      public void run() {
+        int i = 0;
+        while (running) {
+          if (Thread.currentThread().isInterrupted()) {
+            running = false;
+            break;
+          }
+          try {
+            jedis1.set(PRESSURE_KEY + i, pressureValue);
+          } catch (JedisException je) {
+            // Ignore, keep trying to fill memory
+          }
+          i++;
+        }
+      }
+    };
+  }
+
+  private void forceGC() {
+    server1.getVM().invoke(() -> {
+      Runtime.getRuntime().gc();
+    });
+    server2.getVM().invoke(() -> {
+      Runtime.getRuntime().gc();
+    });
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index 0a86362..313969c 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -49,6 +49,8 @@ public class RedisConstants {
   public static final String ERROR_SYNTAX = "syntax error";
   public static final String ERROR_INVALID_EXPIRE_TIME = "invalid expire time in set";
   public static final String ERROR_NOT_A_VALID_FLOAT = "value is not a valid float";
+  public static final String ERROR_OOM_COMMAND_NOT_ALLOWED =
+      "command not allowed when used memory > 'maxmemory'";
 
   public static final String ERROR_UNKNOWN_SLOWLOG_SUBCOMMAND =
       "Unknown subcommand or wrong number of arguments for '%s'. Try SLOWLOG HELP.";
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index f6e909e..44c9f2d 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -121,6 +121,10 @@ public class RedisResponse {
     return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error));
   }
 
+  public static RedisResponse oom(String error) {
+    return new RedisResponse((bba) -> Coder.getOOMResponse(bba, error));
+  }
+
   public static RedisResponse customError(String error) {
     return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error));
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
index ed1ae1a..5cbf5e7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
@@ -18,12 +18,12 @@ package org.apache.geode.redis.internal.executor;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisKey;
 
-public abstract class SingleResultRedisFunction implements InternalFunction<Object[]> {
+public abstract class SingleResultRedisFunction implements AllowExecutionInLowMemory {
 
   private static final long serialVersionUID = 3239452234149879302L;
   private final transient PartitionedRegion partitionedRegion;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 4ca7359..3c9056b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.ForcedDisconnectException;
 import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionInvocationTargetException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -233,6 +234,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
       response = RedisResponse.error(cause.getMessage());
     } else if (cause instanceof RedisDataTypeMismatchException) {
       response = RedisResponse.wrongType(cause.getMessage());
+    } else if (cause instanceof LowMemoryException) {
+      response = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
     } else if (cause instanceof DecoderException
         && cause.getCause() instanceof RedisCommandParserException) {
       response = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);