You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2021/10/11 21:37:09 UTC
[geode] branch support/1.14 updated: GEODE-9640: Initiate threadId
in EventID. (#6905)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new 473f287 GEODE-9640: Initiate threadId in EventID. (#6905)
473f287 is described below
commit 473f287c6ee4d5804b2c8a90ef443ce1b4f5921a
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Oct 5 11:53:05 2021 -0700
GEODE-9640: Initiate threadId in EventID. (#6905)
* This is to make sure a new EventID can be generated after server restarted
after a whole cluster is shut down.
* Wrap around original threadID before it interferes with bulkOp or wan generated threadID.
(cherry picked from commit 4b3c49e788157df94f7d3e4b455adb7a6eaef96b)
---
.../org/apache/geode/internal/cache/EventID.java | 19 +-
.../apache/geode/internal/cache/EventIDTest.java | 48 +++-
.../DurableClientCQClusterRestartDUnitTest.java | 316 +++++++++++++++++++++
3 files changed, 379 insertions(+), 4 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 5a790da..b3a770e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -821,10 +821,25 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
long sequenceID = (HARegionQueue.INIT_OF_SEQUENCEID + 1);
@MakeNotStatic
- private static final AtomicLong atmLong = new AtomicLong(0);
+ private static final AtomicLong atmLong = new AtomicLong(System.currentTimeMillis() %
+ ThreadIdentifier.MAX_THREAD_PER_CLIENT);
ThreadAndSequenceIDWrapper() {
- threadID = atmLong.incrementAndGet();
+ long id = atmLong.incrementAndGet();
+ // wrap around before hitting 1,000,000 as higher number will interfere with bulkOp threadID
+ // generation.
+ if (id < ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+ threadID = id;
+ } else if (id == ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+ atmLong.set(0);
+ threadID = atmLong.incrementAndGet();
+ } else {
+ id = atmLong.incrementAndGet();
+ while (id > ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+ id = atmLong.incrementAndGet();
+ }
+ threadID = id;
+ }
}
long getAndIncrementSequenceID() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
index 6287280..cd9932e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
@@ -14,18 +14,32 @@
*/
package org.apache.geode.internal.cache;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
-import org.assertj.core.api.Assertions;
+import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.DataSerializer;
import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
+import org.apache.geode.test.junit.Repeat;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.RepeatRule;
public class EventIDTest {
+ @Rule
+ public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+ @Rule
+ public RepeatRule repeat = new RepeatRule();
@Test
public void emptyEventIdCanBeSerializedWithCurrentVersion()
@@ -48,7 +62,37 @@ public class EventIDTest {
EventID result = DataSerializer.readObject(
new VersionedDataInputStream(new ByteArrayInputStream(out.toByteArray()), version));
- Assertions.assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
+ assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
+ }
+
+ @Test
+ @Repeat(10)
+ public void threadIDIsWrappedAround() throws Exception {
+ EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
+ long start = wrapper.threadID;
+
+ int numberOfThreads = 100000;
+
+ List<Future<Long>> futures = new ArrayList<>();
+ for (int i = 0; i < numberOfThreads; i++) {
+ futures.add(executorService.submit(this::getThreadID));
+ }
+ for (Future<Long> future : futures) {
+ future.get();
+ }
+ long lastThreadID = executorService.submit(this::getThreadID).get();
+ long expected = start + numberOfThreads + 1;
+ if (expected >= ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+ // wrap around ThreadIdentifier.MAX_THREAD_PER_CLIENT (1,000,000) and 1,000,000
+ // is never used.
+ assertThat(lastThreadID).isEqualTo(expected - ThreadIdentifier.MAX_THREAD_PER_CLIENT + 1);
+ } else {
+ assertThat(lastThreadID).isEqualTo(expected);
+ }
}
+ private long getThreadID() {
+ EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
+ return wrapper.threadID;
+ }
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java
new file mode 100644
index 0000000..7bf19a1
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.UniquePortSupplier;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class DurableClientCQClusterRestartDUnitTest implements Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server;
+ private VM locator;
+ private VM client;
+ private File locatorLog;
+ private File serverLog;
+ private File restartLocatorLog;
+ private File restartServerLog;
+ private int locatorPort;
+ private String durableClientId;
+ private final String cqName = "cqQuery";
+ private final int numOfInvocations = 5;
+ private final int uniquePort = new UniquePortSupplier().getAvailablePort();
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Rule
+ public DistributedExecutorServiceRule executorServiceRule = new DistributedExecutorServiceRule();
+
+ @Before
+ public void setup() throws Exception {
+ locator = VM.getVM(0);
+ server = VM.getVM(1);
+ client = VM.getVM(2);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ locatorLog = new File(temporaryFolder.newFolder(uniqueName), "locator.log");
+ serverLog = temporaryFolder.getRoot().toPath().resolve("server.log").toFile();
+ restartLocatorLog = temporaryFolder.getRoot().toPath().resolve("locator_restart.log").toFile();
+ restartServerLog = temporaryFolder.getRoot().toPath().resolve("server_restart.log").toFile();
+ durableClientId = uniqueName + "client";
+ }
+
+ @Test
+ public void cqEventsNotLostIfClusterRestarted() {
+ startClusterAndDoFunctionCalls(locatorLog, serverLog, 1);
+
+ // restart cluster and perform another round of function calls.
+ startClusterAndDoFunctionCalls(restartLocatorLog, restartServerLog, 2);
+
+ }
+
+ private void startClusterAndDoFunctionCalls(File locatorLog, File serverLog, int iteration) {
+ locatorPort = locator.invoke(() -> startLocator(locatorLog));
+ server.invoke(() -> createCacheServerAndDiskRegion(serverLog));
+ client.invoke(this::setClientRegion);
+ client.invoke(() -> clientCacheRule.getClientCache().readyForEvents());
+ client.invoke(this::callFunctions);
+ client.invoke(() -> verifyCQListenerInvocations(numOfInvocations * iteration));
+ server.invoke(this::closeCache);
+ server.bounceForcibly();
+ locator.invoke(this::stopLocator);
+ }
+
+ private int startLocator(File locatorLog) throws IOException {
+ InetAddress bindAddress = InetAddress.getByName(hostName);
+ Locator locator =
+ Locator.startLocatorAndDS(locatorPort, locatorLog, bindAddress, new Properties());
+ return locator.getPort();
+ }
+
+ private void createCacheServerAndDiskRegion(File logFile) throws Exception {
+ Properties systemProperties = new Properties();
+ systemProperties.setProperty("gemfire.jg-bind-port", Integer.toString(uniquePort));
+ cacheRule.createCache(createServerConfig(logFile), systemProperties);
+
+ createDiskRegionIfNotExist();
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ }
+
+ private Properties createServerConfig(File logFile) {
+ Properties config = new Properties();
+ config.setProperty(LOCATORS, hostName + "[" + locatorPort + "]");
+ config.setProperty(LOG_FILE, logFile.getAbsolutePath());
+ return config;
+ }
+
+ private void closeCache() {
+ cacheRule.closeAndNullCache();
+ }
+
+ private void stopLocator() {
+ Locator.getLocator().stop();
+ }
+
+ private void createDiskRegionIfNotExist() throws IOException {
+ if (cacheRule.getCache().getRegion(regionName) != null) {
+ return;
+ }
+ DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+ DiskStoreFactory diskStoreFactory =
+ cacheRule.getCache().createDiskStoreFactory(diskStoreAttributes);
+ diskStoreFactory.setDiskDirs(new File[] {createOrGetDir()});
+ diskStoreFactory.create(getDiskStoreName());
+
+ RegionFactory<Object, Object> regionFactory =
+ cacheRule.getCache().createRegionFactory(REPLICATE_PERSISTENT);
+ regionFactory.setDiskStoreName(getDiskStoreName());
+ regionFactory.create(regionName);
+ }
+
+ private File createOrGetDir() throws IOException {
+ File dir = new File(temporaryFolder.getRoot(), getDiskStoreName());
+ if (!dir.exists()) {
+ dir = temporaryFolder.newFolder(getDiskStoreName());
+ }
+ return dir;
+ }
+
+ private String getDiskStoreName() {
+ return getClass().getSimpleName() + VM.getVMId();
+ }
+
+ private void callFunctions() throws Exception {
+ InternalRegion region = (InternalRegion) clientCacheRule.getClientCache().getRegion(regionName);
+ Pool pool = region.getServerProxy().getPool();
+
+ List<Future<Void>> futures = new ArrayList<>();
+ for (int i = 0; i < numOfInvocations; i++) {
+ futures.add(executorServiceRule.submit(() -> invokeFunction(pool)));
+ }
+ for (Future<Void> future : futures) {
+ future.get(getTimeout().toMillis(), MILLISECONDS);
+ }
+ }
+
+ private void invokeFunction(Pool pool) {
+ @SuppressWarnings("unchecked")
+ Execution<String, Void, Void> execution =
+ FunctionService.onServer(pool).setArguments(regionName);
+ ResultCollector<Void, Void> resultCollector = execution.execute(new TestFunction());
+ resultCollector.getResult();
+ }
+
+ private void setClientRegion() throws Exception {
+ ClientCache clientCache = clientCacheRule.getClientCache();
+ if (clientCache == null) {
+ Properties config = new Properties();
+ config.setProperty(DURABLE_CLIENT_ID, durableClientId);
+ clientCacheRule.createClientCache(config);
+ }
+
+ Region<Object, Object> region = clientCacheRule.getClientCache().getRegion(regionName);
+ if (region != null) {
+ return;
+ }
+ Pool pool = PoolManager.createFactory()
+ .addLocator(hostName, locatorPort)
+ .setSubscriptionEnabled(true)
+ .create(uniqueName);
+
+ ClientRegionFactory<Object, Object> crf =
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+
+ registerCQ();
+ }
+
+ private void registerCQ() throws Exception {
+ ClientCache clientCache = clientCacheRule.getClientCache();
+
+ QueryService queryService = clientCache.getQueryService();
+ CqAttributesFactory cqaf = new CqAttributesFactory();
+ cqaf.addCqListener(new TestCqListener());
+ CqAttributes cqAttributes = cqaf.create();
+
+ queryService.newCq(cqName, "Select * from " + SEPARATOR + regionName,
+ cqAttributes, true).executeWithInitialResults();
+ }
+
+ private void verifyCQListenerInvocations(int expected) {
+ QueryService cqService = clientCacheRule.getClientCache().getQueryService();
+ await().untilAsserted(() -> {
+ CqListener cqListener = cqService.getCq(cqName).getCqAttributes().getCqListener();
+ assertThat(((TestCqListener) cqListener).numEvents.get()).isEqualTo(expected);
+ });
+ }
+
+ public static class TestFunction implements Function<String>, DataSerializable {
+ private final Random random = new Random();
+
+ @Override
+ public void execute(FunctionContext<String> context) {
+ CqService cqService = ((InternalCache) context.getCache()).getCqService();
+ waitUntilCqRegistered(cqService);
+
+ String regionName = context.getArguments();
+ Region<Object, Object> region = context.getCache().getRegion(regionName);
+ int key = random.nextInt();
+ region.put(key, key);
+ context.getResultSender().lastResult(true);
+ }
+
+ private void waitUntilCqRegistered(CqService cqService) {
+ await().untilAsserted(() -> assertThat(cqService.getAllCqs().size()).isGreaterThan(0));
+ }
+
+ @Override
+ public void toData(DataOutput out) {}
+
+ @Override
+ public void fromData(DataInput in) {}
+ }
+
+ private static class TestCqListener implements CqListener, Serializable {
+ AtomicInteger numEvents = new AtomicInteger();
+
+ @Override
+ public void onEvent(CqEvent aCqEvent) {
+ numEvents.incrementAndGet();
+ }
+
+ @Override
+ public void onError(CqEvent aCqEvent) {}
+ }
+}