You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2021/10/11 21:37:09 UTC

[geode] branch support/1.14 updated: GEODE-9640: Initiate threadId in EventID. (#6905)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 473f287  GEODE-9640: Initiate threadId in EventID. (#6905)
473f287 is described below

commit 473f287c6ee4d5804b2c8a90ef443ce1b4f5921a
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Oct 5 11:53:05 2021 -0700

    GEODE-9640: Initiate threadId in EventID. (#6905)
    
      * This is to make sure a new EventID can be generated after server restarted
        after a whole cluster is shut down.
    
     * Wrap around original threadID before it interferes with bulkOp or wan generated threadID.
    
    (cherry picked from commit 4b3c49e788157df94f7d3e4b455adb7a6eaef96b)
---
 .../org/apache/geode/internal/cache/EventID.java   |  19 +-
 .../apache/geode/internal/cache/EventIDTest.java   |  48 +++-
 .../DurableClientCQClusterRestartDUnitTest.java    | 316 +++++++++++++++++++++
 3 files changed, 379 insertions(+), 4 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 5a790da..b3a770e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -821,10 +821,25 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
     long sequenceID = (HARegionQueue.INIT_OF_SEQUENCEID + 1);
 
     @MakeNotStatic
-    private static final AtomicLong atmLong = new AtomicLong(0);
+    private static final AtomicLong atmLong = new AtomicLong(System.currentTimeMillis() %
+        ThreadIdentifier.MAX_THREAD_PER_CLIENT);
 
     ThreadAndSequenceIDWrapper() {
-      threadID = atmLong.incrementAndGet();
+      long id = atmLong.incrementAndGet();
+      // wrap around before hitting 1,000,000 as higher number will interfere with bulkOp threadID
+      // generation.
+      if (id < ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+        threadID = id;
+      } else if (id == ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+        atmLong.set(0);
+        threadID = atmLong.incrementAndGet();
+      } else {
+        id = atmLong.incrementAndGet();
+        while (id > ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+          id = atmLong.incrementAndGet();
+        }
+        threadID = id;
+      }
     }
 
     long getAndIncrementSequenceID() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
index 6287280..cd9932e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EventIDTest.java
@@ -14,18 +14,32 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 
-import org.assertj.core.api.Assertions;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
+import org.apache.geode.test.junit.Repeat;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.RepeatRule;
 
 public class EventIDTest {
+  @Rule
+  public ExecutorServiceRule executorService = new ExecutorServiceRule();
+
+  @Rule
+  public RepeatRule repeat = new RepeatRule();
 
   @Test
   public void emptyEventIdCanBeSerializedWithCurrentVersion()
@@ -48,7 +62,37 @@ public class EventIDTest {
     EventID result = DataSerializer.readObject(
         new VersionedDataInputStream(new ByteArrayInputStream(out.toByteArray()), version));
 
-    Assertions.assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
+    assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
+  }
+
+  @Test
+  @Repeat(10)
+  public void threadIDIsWrappedAround() throws Exception {
+    EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
+    long start = wrapper.threadID;
+
+    int numberOfThreads = 100000;
+
+    List<Future<Long>> futures = new ArrayList<>();
+    for (int i = 0; i < numberOfThreads; i++) {
+      futures.add(executorService.submit(this::getThreadID));
+    }
+    for (Future<Long> future : futures) {
+      future.get();
+    }
+    long lastThreadID = executorService.submit(this::getThreadID).get();
+    long expected = start + numberOfThreads + 1;
+    if (expected >= ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
+      // wrap around ThreadIdentifier.MAX_THREAD_PER_CLIENT (1,000,000) and 1,000,000
+      // is never used.
+      assertThat(lastThreadID).isEqualTo(expected - ThreadIdentifier.MAX_THREAD_PER_CLIENT + 1);
+    } else {
+      assertThat(lastThreadID).isEqualTo(expected);
+    }
   }
 
+  private long getThreadID() {
+    EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
+    return wrapper.threadID;
+  }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java
new file mode 100644
index 0000000..7bf19a1
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQClusterRestartDUnitTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.UniquePortSupplier;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class DurableClientCQClusterRestartDUnitTest implements Serializable {
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private VM server;
+  private VM locator;
+  private VM client;
+  private File locatorLog;
+  private File serverLog;
+  private File restartLocatorLog;
+  private File restartServerLog;
+  private int locatorPort;
+  private String durableClientId;
+  private final String cqName = "cqQuery";
+  private final int numOfInvocations = 5;
+  private final int uniquePort = new UniquePortSupplier().getAvailablePort();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule = new DistributedExecutorServiceRule();
+
+  @Before
+  public void setup() throws Exception {
+    locator = VM.getVM(0);
+    server = VM.getVM(1);
+    client = VM.getVM(2);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+    locatorLog = new File(temporaryFolder.newFolder(uniqueName), "locator.log");
+    serverLog = temporaryFolder.getRoot().toPath().resolve("server.log").toFile();
+    restartLocatorLog = temporaryFolder.getRoot().toPath().resolve("locator_restart.log").toFile();
+    restartServerLog = temporaryFolder.getRoot().toPath().resolve("server_restart.log").toFile();
+    durableClientId = uniqueName + "client";
+  }
+
+  @Test
+  public void cqEventsNotLostIfClusterRestarted() {
+    startClusterAndDoFunctionCalls(locatorLog, serverLog, 1);
+
+    // restart cluster and perform another round of function calls.
+    startClusterAndDoFunctionCalls(restartLocatorLog, restartServerLog, 2);
+
+  }
+
+  private void startClusterAndDoFunctionCalls(File locatorLog, File serverLog, int iteration) {
+    locatorPort = locator.invoke(() -> startLocator(locatorLog));
+    server.invoke(() -> createCacheServerAndDiskRegion(serverLog));
+    client.invoke(this::setClientRegion);
+    client.invoke(() -> clientCacheRule.getClientCache().readyForEvents());
+    client.invoke(this::callFunctions);
+    client.invoke(() -> verifyCQListenerInvocations(numOfInvocations * iteration));
+    server.invoke(this::closeCache);
+    server.bounceForcibly();
+    locator.invoke(this::stopLocator);
+  }
+
+  private int startLocator(File locatorLog) throws IOException {
+    InetAddress bindAddress = InetAddress.getByName(hostName);
+    Locator locator =
+        Locator.startLocatorAndDS(locatorPort, locatorLog, bindAddress, new Properties());
+    return locator.getPort();
+  }
+
+  private void createCacheServerAndDiskRegion(File logFile) throws Exception {
+    Properties systemProperties = new Properties();
+    systemProperties.setProperty("gemfire.jg-bind-port", Integer.toString(uniquePort));
+    cacheRule.createCache(createServerConfig(logFile), systemProperties);
+
+    createDiskRegionIfNotExist();
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+  }
+
+  private Properties createServerConfig(File logFile) {
+    Properties config = new Properties();
+    config.setProperty(LOCATORS, hostName + "[" + locatorPort + "]");
+    config.setProperty(LOG_FILE, logFile.getAbsolutePath());
+    return config;
+  }
+
+  private void closeCache() {
+    cacheRule.closeAndNullCache();
+  }
+
+  private void stopLocator() {
+    Locator.getLocator().stop();
+  }
+
+  private void createDiskRegionIfNotExist() throws IOException {
+    if (cacheRule.getCache().getRegion(regionName) != null) {
+      return;
+    }
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    DiskStoreFactory diskStoreFactory =
+        cacheRule.getCache().createDiskStoreFactory(diskStoreAttributes);
+    diskStoreFactory.setDiskDirs(new File[] {createOrGetDir()});
+    diskStoreFactory.create(getDiskStoreName());
+
+    RegionFactory<Object, Object> regionFactory =
+        cacheRule.getCache().createRegionFactory(REPLICATE_PERSISTENT);
+    regionFactory.setDiskStoreName(getDiskStoreName());
+    regionFactory.create(regionName);
+  }
+
+  private File createOrGetDir() throws IOException {
+    File dir = new File(temporaryFolder.getRoot(), getDiskStoreName());
+    if (!dir.exists()) {
+      dir = temporaryFolder.newFolder(getDiskStoreName());
+    }
+    return dir;
+  }
+
+  private String getDiskStoreName() {
+    return getClass().getSimpleName() + VM.getVMId();
+  }
+
+  private void callFunctions() throws Exception {
+    InternalRegion region = (InternalRegion) clientCacheRule.getClientCache().getRegion(regionName);
+    Pool pool = region.getServerProxy().getPool();
+
+    List<Future<Void>> futures = new ArrayList<>();
+    for (int i = 0; i < numOfInvocations; i++) {
+      futures.add(executorServiceRule.submit(() -> invokeFunction(pool)));
+    }
+    for (Future<Void> future : futures) {
+      future.get(getTimeout().toMillis(), MILLISECONDS);
+    }
+  }
+
+  private void invokeFunction(Pool pool) {
+    @SuppressWarnings("unchecked")
+    Execution<String, Void, Void> execution =
+        FunctionService.onServer(pool).setArguments(regionName);
+    ResultCollector<Void, Void> resultCollector = execution.execute(new TestFunction());
+    resultCollector.getResult();
+  }
+
+  private void setClientRegion() throws Exception {
+    ClientCache clientCache = clientCacheRule.getClientCache();
+    if (clientCache == null) {
+      Properties config = new Properties();
+      config.setProperty(DURABLE_CLIENT_ID, durableClientId);
+      clientCacheRule.createClientCache(config);
+    }
+
+    Region<Object, Object> region = clientCacheRule.getClientCache().getRegion(regionName);
+    if (region != null) {
+      return;
+    }
+    Pool pool = PoolManager.createFactory()
+        .addLocator(hostName, locatorPort)
+        .setSubscriptionEnabled(true)
+        .create(uniqueName);
+
+    ClientRegionFactory<Object, Object> crf =
+        clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+
+    registerCQ();
+  }
+
+  private void registerCQ() throws Exception {
+    ClientCache clientCache = clientCacheRule.getClientCache();
+
+    QueryService queryService = clientCache.getQueryService();
+    CqAttributesFactory cqaf = new CqAttributesFactory();
+    cqaf.addCqListener(new TestCqListener());
+    CqAttributes cqAttributes = cqaf.create();
+
+    queryService.newCq(cqName, "Select * from " + SEPARATOR + regionName,
+        cqAttributes, true).executeWithInitialResults();
+  }
+
+  private void verifyCQListenerInvocations(int expected) {
+    QueryService cqService = clientCacheRule.getClientCache().getQueryService();
+    await().untilAsserted(() -> {
+      CqListener cqListener = cqService.getCq(cqName).getCqAttributes().getCqListener();
+      assertThat(((TestCqListener) cqListener).numEvents.get()).isEqualTo(expected);
+    });
+  }
+
+  public static class TestFunction implements Function<String>, DataSerializable {
+    private final Random random = new Random();
+
+    @Override
+    public void execute(FunctionContext<String> context) {
+      CqService cqService = ((InternalCache) context.getCache()).getCqService();
+      waitUntilCqRegistered(cqService);
+
+      String regionName = context.getArguments();
+      Region<Object, Object> region = context.getCache().getRegion(regionName);
+      int key = random.nextInt();
+      region.put(key, key);
+      context.getResultSender().lastResult(true);
+    }
+
+    private void waitUntilCqRegistered(CqService cqService) {
+      await().untilAsserted(() -> assertThat(cqService.getAllCqs().size()).isGreaterThan(0));
+    }
+
+    @Override
+    public void toData(DataOutput out) {}
+
+    @Override
+    public void fromData(DataInput in) {}
+  }
+
+  private static class TestCqListener implements CqListener, Serializable {
+    AtomicInteger numEvents = new AtomicInteger();
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+      numEvents.incrementAndGet();
+    }
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+  }
+}