You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/04/13 17:04:05 UTC
[geode] branch develop updated: GEODE-4990: Cluster Config StartUp
Race Condition (#1730)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d4a3689 GEODE-4990: Cluster Config StartUp Race Condition (#1730)
d4a3689 is described below
commit d4a3689675ce72887b8286c440c3ff2b4481fc5b
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Apr 13 18:03:59 2018 +0100
GEODE-4990: Cluster Config StartUp Race Condition (#1730)
* GEODE-4990: Cluster Config StartUp Race Condition
- Added unit test for `GetClusterConfigurationFunction`.
- Added integration tests for `ClusterConfigurationLoader`.
- Set 'InternalLocator.isSharedConfigurationEnabled()' visibility
as `public`.
- Set `ClusterConfigurationLoader.requestConfigurationFromOneLocator()`
visibility as `protected`.
- `GetClusterConfigurationFunction` now throws an exception if the
cluster configuraiton service is not enabled on the locator.
- `GetClusterConfigurationFunction` now returns null if the cluster
configuraiton service is enabled but not yet started. The caller can
decide whether to retry or fail fast.
---
.../InternalClusterConfigurationService.java | 2 +-
.../distributed/internal/InternalLocator.java | 2 +-
.../internal/cache/ClusterConfigurationLoader.java | 2 +-
.../functions/GetClusterConfigurationFunction.java | 40 ++++---
.../ClusterConfigurationLoaderIntegrationTest.java | 92 ++++++++++++++-
.../GetClusterConfigurationFunctionTest.java | 128 +++++++++++++++++++++
6 files changed, 244 insertions(+), 22 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
index ed4e467..d750fd6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalClusterConfigurationService.java
@@ -547,7 +547,7 @@ public class InternalClusterConfigurationService implements ClusterConfiguration
* Creates a ConfigurationResponse based on the configRequest, configuration response contains the
* requested shared configuration This method locks the ClusterConfigurationService
*/
- public ConfigurationResponse createConfigurationResponse(Set<String> groups) throws IOException {
+ public ConfigurationResponse createConfigurationResponse(Set<String> groups) {
ConfigurationResponse configResponse = null;
boolean isLocked = lockSharedConfiguration();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 0535277..be5f649 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -179,7 +179,7 @@ public class InternalLocator extends Locator implements ConnectListener {
private volatile Thread restartThread;
- boolean isSharedConfigurationEnabled() {
+ public boolean isSharedConfigurationEnabled() {
return this.config.getEnableClusterConfiguration();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
index 1a18270..2ef9b2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ClusterConfigurationLoader.java
@@ -303,7 +303,7 @@ public class ClusterConfigurationLoader {
return response;
}
- private ConfigurationResponse requestConfigurationFromOneLocator(
+ protected ConfigurationResponse requestConfigurationFromOneLocator(
InternalDistributedMember locator, Set<String> groups) {
ConfigurationResponse configResponse = null;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
index 5b70150..099077c 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunction.java
@@ -12,10 +12,8 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.management.internal.configuration.functions;
-import java.io.IOException;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -32,20 +30,36 @@ public class GetClusterConfigurationFunction implements InternalFunction {
@Override
public void execute(FunctionContext context) {
- InternalClusterConfigurationService clusterConfigurationService =
- InternalLocator.getLocator().getSharedConfiguration();
-
Set<String> groups = (Set<String>) context.getArguments();
+ InternalLocator internalLocator = InternalLocator.getLocator();
+ logger.info("Received request for configuration: {}", groups);
+
+ // Return exception to the caller so startup fails fast.
+ if (!internalLocator.isSharedConfigurationEnabled()) {
+ String errorMessage = "The cluster configuration service is not enabled on this member.";
+ logger.warn(errorMessage);
+ context.getResultSender().lastResult(new IllegalStateException(errorMessage));
+ return;
+ }
- logger.info("Received request for configuration : {}", groups);
+ // Shared configuration enabled.
+ if (internalLocator.isSharedConfigurationRunning()) {
+ // Cluster configuration is up and running already.
+ InternalClusterConfigurationService clusterConfigurationService =
+ internalLocator.getSharedConfiguration();
- try {
- ConfigurationResponse response =
- clusterConfigurationService.createConfigurationResponse(groups);
- context.getResultSender().lastResult(response);
- } catch (IOException e) {
- logger.error("Unable to retrieve the cluster configuration", e);
- context.getResultSender().lastResult(e);
+ try {
+ ConfigurationResponse response =
+ clusterConfigurationService.createConfigurationResponse(groups);
+ context.getResultSender().lastResult(response);
+ } catch (Exception exception) {
+ logger.warn("Unable to retrieve the cluster configuration", exception);
+ context.getResultSender().lastResult(exception);
+ }
+ } else {
+ // Cluster configuration service is starting up. Return null so callers can decide whether
+ // to fail fast, or wait and retry later.
+ context.getResultSender().lastResult(null);
}
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
index 4c026af..25322ca 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClusterConfigurationLoaderIntegrationTest.java
@@ -12,10 +12,10 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.HashSet;
import java.util.Map;
@@ -25,8 +25,13 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.config.ClusterConfigurationNotAvailableException;
import org.apache.geode.management.internal.configuration.domain.Configuration;
import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -35,25 +40,100 @@ import org.apache.geode.test.junit.rules.LocatorStarterRule;
@Category(IntegrationTest.class)
public class ClusterConfigurationLoaderIntegrationTest {
+ private ClusterConfigurationLoader loader;
@Rule
- public LocatorStarterRule locator = new LocatorStarterRule().withAutoStart();
-
- private ClusterConfigurationLoader loader;
+ public LocatorStarterRule locator = new LocatorStarterRule();
@Before
public void before() {
- loader = new ClusterConfigurationLoader();
+ loader = Mockito.spy(new ClusterConfigurationLoader());
+ }
+
+ @Test
+ public void requestConfigurationFromLocatorsShouldThrowExceptionWhenClusterConfigurationServiceIsNotEnabled() {
+ locator.withProperty("enable-cluster-configuration", "false").startLocator();
+
+ Set<InternalDistributedMember> locators = new HashSet<>();
+ locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+ .getDistributedMember());
+
+ assertThatThrownBy(() -> loader.requestConfigurationFromLocators("", locators))
+ .isInstanceOf(FunctionException.class).hasCauseInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("The cluster configuration service is not enabled on this member.");
+ }
+
+ @Test
+ public void requestConfigurationFromLocatorsShouldThrowExceptionAfterTheSixthNullConfigurationResponse() {
+ CustomAnswer customAnswer = new CustomAnswer(10);
+ Mockito.doAnswer(customAnswer).when(loader).requestConfigurationFromOneLocator(Mockito.any(),
+ Mockito.anySet());
+
+ locator.withProperty("enable-cluster-configuration", "true").startLocator();
+ Set<InternalDistributedMember> locators = new HashSet<>();
+ locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+ .getDistributedMember());
+
+ assertThatThrownBy(() -> loader.requestConfigurationFromLocators("", locators))
+ .isInstanceOf(ClusterConfigurationNotAvailableException.class)
+ .hasMessageContaining("Unable to retrieve cluster configuration from the locator.");
+
+ assertThat(customAnswer.calls).isEqualTo(6);
+ }
+
+
+ @Test
+ public void requestConfigurationFromLocatorsShouldCorrectlyLoadTheClusterConfiguration()
+ throws Exception {
+ locator.withProperty("enable-cluster-configuration", "true").startLocator();
+
+ Set<InternalDistributedMember> locators = new HashSet<>();
+ locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
+ .getDistributedMember());
+
+ ConfigurationResponse response = loader.requestConfigurationFromLocators("", locators);
+ Map<String, Configuration> configurationMap = response.getRequestedConfiguration();
+ assertThat(configurationMap.size()).isEqualTo(1);
+ assertThat(configurationMap.get("cluster")).isNotNull();
}
@Test
- public void requestForClusterConfiguration() throws Exception {
+ public void requestConfigurationFromLocatorsShouldCorrectlyLoadTheClusterConfigurationEvenAfterSeveralRetries()
+ throws Exception {
+ int mockLimit = 6;
+ CustomAnswer customAnswer = new CustomAnswer(mockLimit);
+ Mockito.doAnswer(customAnswer).when(loader).requestConfigurationFromOneLocator(Mockito.any(),
+ Mockito.anySet());
+
+ locator.withProperty("enable-cluster-configuration", "true").startLocator();
Set<InternalDistributedMember> locators = new HashSet<>();
locators.add((InternalDistributedMember) locator.getLocator().getDistributedSystem()
.getDistributedMember());
+
ConfigurationResponse response = loader.requestConfigurationFromLocators("", locators);
Map<String, Configuration> configurationMap = response.getRequestedConfiguration();
assertThat(configurationMap.size()).isEqualTo(1);
assertThat(configurationMap.get("cluster")).isNotNull();
+ assertThat(customAnswer.calls).isEqualTo(mockLimit);
+ }
+
+ class CustomAnswer implements Answer {
+ public int calls;
+ public int mockLimit;
+
+ public CustomAnswer(int mockLimit) {
+ this.calls = 0;
+ this.mockLimit = mockLimit;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ calls++;
+ if (calls < mockLimit) {
+ return null;
+ } else {
+ return invocation.callRealMethod();
+ }
+ }
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java
new file mode 100644
index 0000000..34a3151
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/configuration/functions/GetClusterConfigurationFunctionTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.configuration.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.distributed.internal.InternalClusterConfigurationService;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@PowerMockIgnore("*.UnitTest")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InternalLocator.class)
+public class GetClusterConfigurationFunctionTest {
+ private InternalLocator mockedLocator;
+ private FunctionContext mockedFunctionContext;
+ private ResultSender<Object> mockedResultSender;
+ private InternalClusterConfigurationService mockedConfigurationService;
+ private GetClusterConfigurationFunction getClusterConfigurationFunction;
+
+ @Before
+ public void setUp() {
+ mockedResultSender = mock(ResultSender.class);
+ mockedLocator = mock(InternalLocator.class);
+ mockedFunctionContext = mock(FunctionContext.class);
+ mockedConfigurationService = mock(InternalClusterConfigurationService.class);
+ getClusterConfigurationFunction = new GetClusterConfigurationFunction();
+
+ when(mockedLocator.isSharedConfigurationEnabled()).thenReturn(true);
+ when(mockedLocator.isSharedConfigurationRunning()).thenReturn(true);
+ when(mockedFunctionContext.getResultSender()).thenReturn(mockedResultSender);
+ when(mockedFunctionContext.getArguments()).thenReturn(Collections.emptySet());
+
+ PowerMockito.mockStatic(InternalLocator.class);
+ when(InternalLocator.getLocator()).thenReturn(mockedLocator);
+ }
+
+ @Test
+ public void executeShouldReturnIllegalStateExceptionWhenClusterConfigurationServiceIsDisabled() {
+ when(mockedLocator.isSharedConfigurationEnabled()).thenReturn(false);
+ ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
+
+ assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+ .doesNotThrowAnyException();
+ verify(mockedResultSender).lastResult(argumentCaptor.capture());
+ Exception exceptionThrown = argumentCaptor.getValue();
+ assertThat(exceptionThrown).isInstanceOf(IllegalStateException.class)
+ .hasMessage("The cluster configuration service is not enabled on this member.");
+ }
+
+ @Test
+ public void executeShouldReturnExceptionWhenClusterConfigurationServiceIsEnabledButFailuresOccurWhileRetrievingIt() {
+ when(mockedConfigurationService.createConfigurationResponse(any()))
+ .thenThrow(new RuntimeException("Mocked Exception."));
+ when(mockedLocator.getSharedConfiguration()).thenReturn(mockedConfigurationService);
+ ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
+
+ assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+ .doesNotThrowAnyException();
+ verify(mockedResultSender).lastResult(argumentCaptor.capture());
+ Exception exceptionThrown = argumentCaptor.getValue();
+ assertThat(exceptionThrown).isInstanceOf(RuntimeException.class)
+ .hasMessage("Mocked Exception.");
+ }
+
+ @Test
+ public void executeShouldReturnNullWhenClusterConfigurationServiceIsEnabledButNotRunning() {
+ when(mockedLocator.isSharedConfigurationRunning()).thenReturn(false);
+
+ assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+ .doesNotThrowAnyException();
+ verify(mockedResultSender, times(1)).lastResult(null);
+ }
+
+ @Test
+ public void executeShouldReturnTheRequestConfigurationWhenClusterConfigurationServiceIsEnabled() {
+ Set<String> requestedGroups = new HashSet<>(Arrays.asList("group1", "group2"));
+ when(mockedFunctionContext.getArguments()).thenReturn(requestedGroups);
+ when(mockedLocator.getSharedConfiguration()).thenReturn(mockedConfigurationService);
+ ConfigurationResponse mockedResponse = new ConfigurationResponse();
+ when(mockedConfigurationService.createConfigurationResponse(any())).thenReturn(mockedResponse);
+ ArgumentCaptor<ConfigurationResponse> argumentCaptor =
+ ArgumentCaptor.forClass(ConfigurationResponse.class);
+
+ assertThatCode(() -> getClusterConfigurationFunction.execute(mockedFunctionContext))
+ .doesNotThrowAnyException();
+ verify(mockedResultSender).lastResult(argumentCaptor.capture());
+ verify(mockedConfigurationService, times(1)).createConfigurationResponse(requestedGroups);
+ verify(mockedResultSender, times(1)).lastResult(mockedResponse);
+ }
+}
--
To stop receiving notification emails like this one, please contact
jinmeiliao@apache.org.