You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/07/12 16:40:47 UTC
[geode] branch support/1.14 updated: GEODE-9346: When client
received incorrect byte array of PdxType due to broken socket,
it should be retried (#6561) (#6687)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new dba525c GEODE-9346: When client received incorrect byte array of PdxType due to broken socket, it should be retried (#6561) (#6687)
dba525c is described below
commit dba525cb167ac38e7e9c25d84d1d2d2eaf2305bb
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Mon Jul 12 09:39:55 2021 -0700
GEODE-9346: When client received incorrect byte array of PdxType due to broken socket, it should be retried (#6561) (#6687)
(cherry picked from commit 11e4c3a4ca4bf7ef2203e0fdd111e536e5721e50)
---
.../geode/cache/query/dunit/PDXQueryTestBase.java | 19 +-
.../cache/query/dunit/PdxLocalQueryDUnitTest.java | 6 +-
.../query/dunit/PdxMultiThreadQueryDUnitTest.java | 368 +++++++++++++++++++++
.../geode/cache/query/dunit/PdxQueryDUnitTest.java | 67 ++--
.../geode/cache/client/internal/QueryOp.java | 28 +-
5 files changed, 442 insertions(+), 46 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
index 228362a..3f59d97 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
@@ -82,7 +83,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
preTearDownPDXQueryTestBase();
disconnectAllFromDS(); // tests all expect to create a new ds
// Reset the testObject numinstance for the next test.
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
// In all VM.
resetTestObjectInstanceCount();
}
@@ -96,11 +97,11 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
vm.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.numInstance = 0;
PositionPdx.numInstance = 0;
PositionPdx.cnt = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
@@ -306,15 +307,15 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
public static class TestObject2 implements PdxSerializable {
public int _id;
- public static int numInstance = 0;
+ public static AtomicInteger numInstance = new AtomicInteger();
public TestObject2() {
- numInstance++;
+ numInstance.incrementAndGet();
}
public TestObject2(int id) {
this._id = id;
- numInstance++;
+ numInstance.incrementAndGet();
}
public int getId() {
@@ -359,7 +360,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
public int important;
public int selection;
public int select;
- public static int numInstance = 0;
+ public static final AtomicInteger numInstance = new AtomicInteger();
public Map idTickers = new HashMap();
public HashMap positions = new HashMap();
public TestObject2 test;
@@ -368,7 +369,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
if (log != null) {
log.info("TestObject ctor stack trace", new Exception());
}
- numInstance++;
+ numInstance.incrementAndGet();
}
public TestObject(int id, String ticker) {
@@ -381,7 +382,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
this.important = id;
this.selection = id;
this.select = id;
- numInstance++;
+ numInstance.incrementAndGet();
idTickers.put(id + "", ticker);
this.test = new TestObject2(id);
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
index 1c4ce10..60484c1 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
@@ -899,7 +899,7 @@ public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
protected final void preTearDownPDXQueryTestBase() throws Exception {
disconnectAllFromDS(); // tests all expect to create a new ds
// Reset the testObject numinstance for the next test.
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.DEBUG = false;
// In all VM.
resetTestObjectInstanceCount();
@@ -917,11 +917,11 @@ public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
vm.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.numInstance = 0;
PositionPdx.numInstance = 0;
PositionPdx.cnt = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java
new file mode 100644
index 0000000..6e1bc7e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.SerializationException;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.query.FunctionDomainException;
+import org.apache.geode.cache.query.NameResolutionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryInvocationTargetException;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.TypeMismatchException;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxSerializationException;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.version.VersionManager;
+import org.apache.geode.util.internal.GeodeGlossary;
+
+@Category({OQLQueryTest.class})
+public class PdxMultiThreadQueryDUnitTest extends PDXQueryTestBase {
+ public static final Logger logger = LogService.getLogger();
+ static final int numberOfEntries = 100;
+ final String poolName = "testClientServerQueryPool";
+ final String hostName = NetworkUtils.getServerHostName();
+ private VM server0;
+ private VM server1;
+ private VM server2;
+ private VM client;
+ private int port0;
+ private int port1;
+ private int port2;
+
+ public PdxMultiThreadQueryDUnitTest() {
+ super();
+ }
+
+ @Before
+ public void startUpServersAndClient() {
+ final Host host = Host.getHost(0);
+
+ server0 = host.getVM(VersionManager.CURRENT_VERSION, 0);
+ server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+ server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+ client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+ // Start servers
+ for (VM vm : Arrays.asList(server0, server1, server2)) {
+ vm.invoke((SerializableRunnableIF) this::configAndStartBridgeServer);
+ }
+
+ port0 = server0.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ port1 = server1.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ port2 = server2.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ }
+
+ @After
+ public void closeServersAndClient() {
+ closeClient(client);
+ closeClient(server2);
+ closeClient(server1);
+ closeClient(server0);
+ }
+
+ @Test
+ public void testClientServerQuery() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObject(i, "vmware"));
+ }
+ });
+
+ // Create client region
+ client.invoke(() -> {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(hostName, port1);
+ cf.addPoolServer(hostName, port2);
+ cf.addPoolServer(hostName, port0);
+ cf.setPdxReadSerialized(false);
+ ClientCache clientCache = getClientCache(cf);
+ Region<Object, Object> region =
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+
+ logger.info("### Executing Query on server: " + queryString[1] + ": from client region: "
+ + region.getFullPath());
+ final int size = 100;
+ IntStream.range(0, size).parallel().forEach(a -> {
+ try {
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ region.query(queryString[1]);
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ } catch (FunctionDomainException | TypeMismatchException | NameResolutionException
+ | QueryInvocationTargetException e) {
+ fail("Unexpected query exception:" + e.getMessage());
+ }
+ });
+ await().until(() -> TestObject.numInstance.get() == size * numberOfEntries);
+ });
+ }
+
+ @Test
+ public void testClientServerQueryUsingRemoteQueryService()
+ throws CacheException, InterruptedException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObject(i, "vmware"));
+ }
+ });
+
+ // Create client pool.
+ createPool(client, poolName, new String[] {hostName, hostName, hostName},
+ new int[] {port0, port1, port2}, true);
+
+ final int size = 100;
+ AsyncInvocation[] asyncInvocationArray = new AsyncInvocation[size];
+ for (int i = 0; i < size; i++) {
+ asyncInvocationArray[i] =
+ client.invokeAsync(() -> {
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ (SelectResults) query.execute();
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ });
+ }
+
+ for (int i = 0; i < size; i++) {
+ asyncInvocationArray[i].await();
+ }
+ client
+ .invoke(() -> await().until(() -> TestObject.numInstance.get() == size * numberOfEntries));
+ }
+
+ @Test
+ public void testRetrySucceedWithPdxSerializationException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with 3 servers
+ createPool(client, poolName, new String[] {hostName, hostName, hostName},
+ new int[] {port0, port1, port2}, true);
+
+ client.invoke(() -> {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ try {
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ (SelectResults) query.execute();
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ // the 2 failed try incremented numInstance
+ assertThat(numberOfEntries + 2)
+ .isEqualTo(TestObjectThrowsPdxSerializationException.numInstance.get());
+ } finally {
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isFalse();
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ @Test
+ public void testRetryNotEnabledBySystemProperty() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ // If the client did not explicitly specify GeodeGlossary.GEMFIRE_PREFIX +
+ // "enableQueryRetryOnPdxSerializationException" to true, retry will not be enabled
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class)
+ .hasCauseInstanceOf(SerializationException.class);
+ assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(1);
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isTrue();
+ } finally {
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = false;
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ }
+ });
+ }
+
+ @Test
+ public void testNotToRetryOnRegularSerializationException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ TestObjectThrowsSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class)
+ .hasCauseInstanceOf(SerializationException.class);
+ assertThat(TestObjectThrowsSerializationException.numInstance.get()).isEqualTo(1);
+ assertThat(TestObjectThrowsSerializationException.throwExceptionOnDeserialization)
+ .isTrue();
+ } finally {
+ TestObjectThrowsSerializationException.throwExceptionOnDeserialization = false;
+ TestObjectThrowsSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ @Test
+ public void testRetryFailedWithServerConnectivityException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerConnectivityException.class);
+ assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(2);
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isFalse();
+ } finally {
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ public static class TestObjectThrowsPdxSerializationException implements PdxSerializable {
+ private static boolean throwExceptionOnDeserialization = false;
+ public static AtomicInteger numInstance = new AtomicInteger();
+
+ public TestObjectThrowsPdxSerializationException() {
+ numInstance.incrementAndGet();
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {}
+
+ @Override
+ public void fromData(PdxReader reader) {
+ if (throwExceptionOnDeserialization) {
+ if (numInstance.get() >= 2) {
+ // after retried 2 servers, let the retry to 3rd server succeed
+ throwExceptionOnDeserialization = false;
+ }
+ throw new PdxSerializationException("Deserialization is expected to fail in this VM");
+ }
+ }
+ }
+
+ public static class TestObjectThrowsSerializationException implements PdxSerializable {
+ private static boolean throwExceptionOnDeserialization = false;
+ public static AtomicInteger numInstance = new AtomicInteger();
+
+ public TestObjectThrowsSerializationException() {
+ numInstance.incrementAndGet();
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {}
+
+ @Override
+ public void fromData(PdxReader reader) {
+ if (throwExceptionOnDeserialization) {
+ if (numInstance.get() >= 2) {
+ // after retried 2 servers, let the retry to 3rd server succeed
+ throwExceptionOnDeserialization = false;
+ }
+ throw new SerializationException("Deserialization is expected to fail in this VM");
+ }
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
index 0fa0067..9ff3777 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
@@ -113,7 +113,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
// Execute query with different type of Results.
QueryService qs = getCache().getQueryService();
@@ -140,7 +140,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
// Pdx objects for local queries now get deserialized when results are iterated.
// So the deserialized objects are no longer cached in VMCachedDeserializable.
- assertEquals(numberOfEntries * 2, TestObject.numInstance);
+ assertEquals(numberOfEntries * 2, TestObject.numInstance.get());
}
});
@@ -188,7 +188,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -232,7 +232,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -280,7 +280,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -328,7 +328,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -380,7 +380,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -426,7 +426,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -550,7 +550,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(2 * numberOfEntries, TestObject.numInstance);
+ assertEquals(2 * numberOfEntries, TestObject.numInstance.get());
}
};
@@ -561,7 +561,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -856,7 +856,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
Assert.fail("Failed executing " + queryStr, e);
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
};
@@ -867,7 +867,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1149,7 +1149,8 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(2 * (numberOfEntries + 5), (TestObject.numInstance + TestObject2.numInstance));
+ assertEquals(2 * (numberOfEntries + 5),
+ (TestObject.numInstance.get() + TestObject2.numInstance.get()));
}
};
@@ -1160,7 +1161,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1246,7 +1247,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
Assert.fail("Failed executing " + queryString[i], e);
}
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1254,7 +1255,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1263,7 +1264,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1272,7 +1273,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1302,7 +1303,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
}
- if (TestObject.numInstance <= 0) {
+ if (TestObject.numInstance.get() <= 0) {
fail("Expected TestObject instance to be >= 0.");
}
}
@@ -1313,7 +1314,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1623,7 +1624,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1643,7 +1644,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1683,14 +1684,14 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1731,7 +1732,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(4 * numberOfEntries, TestObject.numInstance);
+ assertEquals(4 * numberOfEntries, TestObject.numInstance.get());
for (int i = 3; i < queryString.length; i++) {
try {
@@ -1763,7 +1764,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1772,7 +1773,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1892,7 +1893,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1920,7 +1921,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -2560,14 +2561,14 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("validate") {
@Override
public void run2() throws CacheException {
- assertEquals(testObjectCnt, TestObject.numInstance);
+ assertEquals(testObjectCnt, TestObject.numInstance.get());
assertEquals(positionObjectCnt, PositionPdx.numInstance);
- assertEquals(testObjCnt, TestObject2.numInstance);
+ assertEquals(testObjCnt, TestObject2.numInstance.get());
// Reset the instances
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PositionPdx.numInstance = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
index 0a37be0..1675cae 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.client.internal;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.geode.SerializationException;
@@ -31,6 +32,9 @@ import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ObjectPartList;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.pdx.PdxSerializationException;
+import org.apache.geode.util.internal.GeodeGlossary;
/**
* Does a region query on a server
@@ -120,7 +124,29 @@ public class QueryOp {
queryResult = resultPart.getObject();
} catch (Exception e) {
String s = "While deserializing " + getOpName() + " result";
- exceptionRef[0] = new SerializationException(s, e);
+
+ // Enable the workaround to convert PdxSerializationException into IOException to retry.
+ // It only worked when the client is configured to connect to more than one cache server
+ // AND the pool's "retry-attempts" is -1 (the default which means try each server) or > 0.
+ // It is possible that if application closed the current connection and got a new
+ // connection to the same server and retried the query to it, that it would also
+ // workaround this issue and it would not have the limitations of needing multiple servers
+ // and would not depend on the retry-attempts configuration.
+ boolean enableQueryRetryOnPdxSerializationException = Boolean.getBoolean(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException");
+ if (e instanceof PdxSerializationException
+ && enableQueryRetryOnPdxSerializationException) {
+ // IOException will allow the client to retry next server in the connection pool until
+ // exhausted all the servers (so it will not retry forever). Why retry:
+ // The byte array of the pdxInstance is always the same at the server. Other clients can
+ // get a correct one from query response message. Even this client can get it correctly
+ // before and after the PdxSerializationException.
+ exceptionRef[0] = new IOException(s, e);
+ LogService.getLogger().warn(
+ "Encountered unexpected PdxSerializationException, retrying on another server");
+ } else {
+ exceptionRef[0] = new SerializationException(s, e);
+ }
return;
}
if (queryResult instanceof Throwable) {