You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/04/13 17:04:05 UTC

[geode] branch develop updated: GEODE-4990: Cluster Config StartUp Race Condition (#1730)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d4a3689  GEODE-4990: Cluster Config StartUp Race Condition (#1730)
d4a3689 is described below

commit d4a3689675ce72887b8286c440c3ff2b4481fc5b
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Apr 13 18:03:59 2018 +0100

    GEODE-4990: Cluster Config StartUp Race Condition (#1730)
    
    * GEODE-4990: Cluster Config StartUp Race Condition
    
    - Added unit test for `GetClusterConfigurationFunction`.
    - Added integration tests for `ClusterConfigurationLoader`.
    - Set 'InternalLocator.isSharedConfigurationEnabled()' visibility
      as `public`.
    - Set `ClusterConfigurationLoader.requestConfigurationFromOneLocator()`
      visibility as `protected`.
    - `GetClusterConfigurationFunction` now throws an exception if the
      cluster configuraiton service is not enabled on the locator.
    - `GetClusterConfigurationFunction` now returns null if the cluster
      configuraiton service is enabled but not yet started. The caller can
      decide whether to retry or fail fast.
---
 .../InternalClusterConfigurationService.java       |   2 +-
 .../distributed/internal/InternalLocator.java      |   2 +-
 .../internal/cache/ClusterConfigurationLoader.java |   2 +-
 .../functions/GetClusterConfigurationFunction.java |  40 ++++---
 .../ClusterConfigurationLoaderIntegrationTest.java |  92 ++++++++++++++-
 .../GetClusterConfigurationFunctionTest.java       | 128 +++++++++++++++++++++
 6 files changed, 244 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
index ed4e467..d750fd6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
@@ -547,7 +547,7 @@ public class InternalClusterConfigurationService implements ClusterConfiguration
    * Creates a ConfigurationResponse based on the configRequest, configuration response contains the
    * requested shared configuration This method locks the ClusterConfigurationService
    */
-  public ConfigurationResponse createConfigurationResponse(Set<String> groups) throws IOException {
+  public ConfigurationResponse createConfigurationResponse(Set<String> groups) {
     ConfigurationResponse configResponse = null;
 
     boolean isLocked = lockSharedConfiguration();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 0535277..be5f649 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -179,7 +179,7 @@ public class InternalLocator extends Locator implements ConnectListener {
 
   private volatile Thread restartThread;
 
-  boolean isSharedConfigurationEnabled() {
+  public boolean isSharedConfigurationEnabled() {
     return this.config.getEnableClusterConfiguration();
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
index 1a18270..2ef9b2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
@@ -303,7 +303,7 @@ public class ClusterConfigurationLoader {
     return response;
   }
 
-  private ConfigurationResponse requestConfigurationFromOneLocator(
+  protected ConfigurationResponse requestConfigurationFromOneLocator(
       InternalDistributedMember locator, Set<String> groups) {
     ConfigurationResponse configResponse = null;
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
index 5b70150..099077c 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
@@ -12,10 +12,8 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.management.internal.configuration.functions;
 
-import java.io.IOException;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -32,20 +30,36 @@ public class GetClusterConfigurationFunction implements InternalFunction {
 
   @Override
   public void execute(FunctionContext context) {
-    InternalClusterConfigurationService clusterConfigurationService =
-        InternalLocator.getLocator().getSharedConfiguration();
-
     Set<String> groups = (Set<String>) context.getArguments();
+    InternalLocator internalLocator = InternalLocator.getLocator();
+    logger.info("Received request for configuration: {}", groups);
+
+    // Return exception to the caller so startup fails fast.
+    if (!internalLocator.isSharedConfigurationEnabled()) {
+      String errorMessage = "The cluster configuration service is not enabled on this member.";
+      logger.warn(errorMessage);
+      context.getResultSender().lastResult(new IllegalStateException(errorMessage));
+      return;
+    }
 
-    logger.info("Received request for configuration  : {}", groups);
+    // Shared configuration enabled.
+    if (internalLocator.isSharedConfigurationRunning()) {
+      // Cluster configuration is up and running already.
+      InternalClusterConfigurationService clusterConfigurationService =
+          internalLocator.getSharedConfiguration();
 
-    try {
-      ConfigurationResponse response =
-          clusterConfigurationService.createConfigurationResponse(groups);
-      context.getResultSender().lastResult(response);
-    } catch (IOException e) {
-      logger.error("Unable to retrieve the cluster configuration", e);
-      context.getResultSender().lastResult(e);
+      try {
+        ConfigurationResponse response =
+            clusterConfigurationService.createConfigurationResponse(groups);
+        context.getResultSender().lastResult(response);
+      } catch (Exception exception) {
+        logger.warn("Unable to retrieve the cluster configuration", exception);
+        context.getResultSender().lastResult(exception);
+      }
+    } else {
+      // Cluster configuration service is starting up. Return null so callers can decide whether
+      // to fail fast, or wait and retry later.
+      context.getResultSender().lastResult(null);
     }
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
index 4c026af..25322ca 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
@@ -12,10 +12,10 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.HashSet;
 import java.util.Map;
@@ -25,8 +25,13 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.config.ClusterConfigurationNotAvailableException;
 import org.apache.geode.management.internal.configuration.domain.Configuration;
 import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -35,25 +40,100 @@ import org.apache.geode.test.junit.rules.LocatorStarterRule;
 
 @Category(IntegrationTest.class)
 public class ClusterConfigurationLoaderIntegrationTest {
+  private ClusterConfigurationLoader loader;
 
   @Rule
-  public LocatorStarterRule locator = new LocatorStarterRule().withAutoStart();
-
-  private ClusterConfigurationLoader loader;
+  public LocatorStarterRule locator = new LocatorStarterRule();
 
   @Before
   public void before() {
-    loader = new ClusterConfigurationLoader();
+    loader = Mockito.spy(new ClusterConfigurationLoader());
+  }
+
+  @Test
+  public void requestConfigurationFromLocatorsShouldThrowExceptionWhenClusterConfigurationServiceIsNotEnabled() {
+    locator.withProperty("enable-cluster-configuration", "false").startLocator();
+
+    Set<InternalDistributedMember> locators = new HashSet<>();
+    locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+        .getDistributedMember());
+
+    assertThatThrownBy(() -> loader.requestConfigurationFromLocators("", locators))
+        .isInstanceOf(FunctionException.class).hasCauseInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("The cluster configuration service is not enabled on this member.");
+  }
+
+  @Test
+  public void requestConfigurationFromLocatorsShouldThrowExceptionAfterTheSixthNullConfigurationResponse() {
+    CustomAnswer customAnswer = new CustomAnswer(10);
+    Mockito.doAnswer(customAnswer).when(loader).requestConfigurationFromOneLocator(Mockito.any(),
+        Mockito.anySet());
+
+    locator.withProperty("enable-cluster-configuration", "true").startLocator();
+    Set<InternalDistributedMember> locators = new HashSet<>();
+    locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+        .getDistributedMember());
+
+    assertThatThrownBy(() -> loader.requestConfigurationFromLocators("", locators))
+        .isInstanceOf(ClusterConfigurationNotAvailableException.class)
+        .hasMessageContaining("Unable to retrieve cluster configuration from the locator.");
+
+    assertThat(customAnswer.calls).isEqualTo(6);
+  }
+
+
+  @Test
+  public void requestConfigurationFromLocatorsShouldCorrectlyLoadTheClusterConfiguration()
+      throws Exception {
+    locator.withProperty("enable-cluster-configuration", "true").startLocator();
+
+    Set<InternalDistributedMember> locators = new HashSet<>();
+    locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+        .getDistributedMember());
+
+    ConfigurationResponse response = loader.requestConfigurationFromLocators("", locators);
+    Map<String, Configuration> configurationMap = response.getRequestedConfiguration();
+    assertThat(configurationMap.size()).isEqualTo(1);
+    assertThat(configurationMap.get("cluster")).isNotNull();
   }
 
   @Test
-  public void requestForClusterConfiguration() throws Exception {
+  public void requestConfigurationFromLocatorsShouldCorrectlyLoadTheClusterConfigurationEvenAfterSeveralRetries()
+      throws Exception {
+    int mockLimit = 6;
+    CustomAnswer customAnswer = new CustomAnswer(mockLimit);
+    Mockito.doAnswer(customAnswer).when(loader).requestConfigurationFromOneLocator(Mockito.any(),
+        Mockito.anySet());
+
+    locator.withProperty("enable-cluster-configuration", "true").startLocator();
     Set<InternalDistributedMember> locators = new HashSet<>();
     locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
         .getDistributedMember());
+
     ConfigurationResponse response = loader.requestConfigurationFromLocators("", locators);
     Map<String, Configuration> configurationMap = response.getRequestedConfiguration();
     assertThat(configurationMap.size()).isEqualTo(1);
     assertThat(configurationMap.get("cluster")).isNotNull();
+    assertThat(customAnswer.calls).isEqualTo(mockLimit);
+  }
+
+  class CustomAnswer implements Answer {
+    public int calls;
+    public int mockLimit;
+
+    public CustomAnswer(int mockLimit) {
+      this.calls = 0;
+      this.mockLimit = mockLimit;
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      calls++;
+      if (calls < mockLimit) {
+        return null;
+      } else {
+        return invocation.callRealMethod();
+      }
+    }
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java
new file mode 100644
index 0000000..34a3151
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.configuration.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.distributed.internal.InternalClusterConfigurationService;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@PowerMockIgnore("*.UnitTest")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InternalLocator.class)
+public class GetClusterConfigurationFunctionTest {
+  private InternalLocator mockedLocator;
+  private FunctionContext mockedFunctionContext;
+  private ResultSender<Object> mockedResultSender;
+  private InternalClusterConfigurationService mockedConfigurationService;
+  private GetClusterConfigurationFunction getClusterConfigurationFunction;
+
+  @Before
+  public void setUp() {
+    mockedResultSender = mock(ResultSender.class);
+    mockedLocator = mock(InternalLocator.class);
+    mockedFunctionContext = mock(FunctionContext.class);
+    mockedConfigurationService = mock(InternalClusterConfigurationService.class);
+    getClusterConfigurationFunction = new GetClusterConfigurationFunction();
+
+    when(mockedLocator.isSharedConfigurationEnabled()).thenReturn(true);
+    when(mockedLocator.isSharedConfigurationRunning()).thenReturn(true);
+    when(mockedFunctionContext.getResultSender()).thenReturn(mockedResultSender);
+    when(mockedFunctionContext.getArguments()).thenReturn(Collections.emptySet());
+
+    PowerMockito.mockStatic(InternalLocator.class);
+    when(InternalLocator.getLocator()).thenReturn(mockedLocator);
+  }
+
+  @Test
+  public void executeShouldReturnIllegalStateExceptionWhenClusterConfigurationServiceIsDisabled() {
+    when(mockedLocator.isSharedConfigurationEnabled()).thenReturn(false);
+    ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
+
+    assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+        .doesNotThrowAnyException();
+    verify(mockedResultSender).lastResult(argumentCaptor.capture());
+    Exception exceptionThrown = argumentCaptor.getValue();
+    assertThat(exceptionThrown).isInstanceOf(IllegalStateException.class)
+        .hasMessage("The cluster configuration service is not enabled on this member.");
+  }
+
+  @Test
+  public void executeShouldReturnExceptionWhenClusterConfigurationServiceIsEnabledButFailuresOccurWhileRetrievingIt() {
+    when(mockedConfigurationService.createConfigurationResponse(any()))
+        .thenThrow(new RuntimeException("Mocked Exception."));
+    when(mockedLocator.getSharedConfiguration()).thenReturn(mockedConfigurationService);
+    ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
+
+    assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+        .doesNotThrowAnyException();
+    verify(mockedResultSender).lastResult(argumentCaptor.capture());
+    Exception exceptionThrown = argumentCaptor.getValue();
+    assertThat(exceptionThrown).isInstanceOf(RuntimeException.class)
+        .hasMessage("Mocked Exception.");
+  }
+
+  @Test
+  public void executeShouldReturnNullWhenClusterConfigurationServiceIsEnabledButNotRunning() {
+    when(mockedLocator.isSharedConfigurationRunning()).thenReturn(false);
+
+    assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+        .doesNotThrowAnyException();
+    verify(mockedResultSender, times(1)).lastResult(null);
+  }
+
+  @Test
+  public void executeShouldReturnTheRequestConfigurationWhenClusterConfigurationServiceIsEnabled() {
+    Set<String> requestedGroups = new HashSet<>(Arrays.asList("group1", "group2"));
+    when(mockedFunctionContext.getArguments()).thenReturn(requestedGroups);
+    when(mockedLocator.getSharedConfiguration()).thenReturn(mockedConfigurationService);
+    ConfigurationResponse mockedResponse = new ConfigurationResponse();
+    when(mockedConfigurationService.createConfigurationResponse(any())).thenReturn(mockedResponse);
+    ArgumentCaptor<ConfigurationResponse> argumentCaptor =
+        ArgumentCaptor.forClass(ConfigurationResponse.class);
+
+    assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+        .doesNotThrowAnyException();
+    verify(mockedResultSender).lastResult(argumentCaptor.capture());
+    verify(mockedConfigurationService, times(1)).createConfigurationResponse(requestedGroups);
+    verify(mockedResultSender, times(1)).lastResult(mockedResponse);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
jinmeiliao@apache.org.