You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2021/07/12 20:55:09 UTC
[geode] branch support/1.12 updated: GEODE-9346: When client received incorrect byte array of PdxType due … (#6561)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new ab08366 GEODE-9346: When client received incorrect byte array of PdxType due … (#6561)
ab08366 is described below
commit ab0836686ddb24544e931201340a707236c1ae29
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Jul 9 10:54:17 2021 -0700
GEODE-9346: When client received incorrect byte array of PdxType due … (#6561)
(cherry picked from commit 11e4c3a4ca4bf7ef2203e0fdd111e536e5721e50)
---
.../geode/cache/query/dunit/PDXQueryTestBase.java | 19 +-
.../cache/query/dunit/PdxLocalQueryDUnitTest.java | 6 +-
.../query/dunit/PdxMultiThreadQueryDUnitTest.java | 368 +++++++++++++++++++++
.../geode/cache/query/dunit/PdxQueryDUnitTest.java | 67 ++--
.../geode/cache/client/internal/QueryOp.java | 28 +-
5 files changed, 442 insertions(+), 46 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
index 94941a7..e975996 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
@@ -81,7 +82,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
preTearDownPDXQueryTestBase();
disconnectAllFromDS(); // tests all expect to create a new ds
// Reset the testObject numinstance for the next test.
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
// In all VM.
resetTestObjectInstanceCount();
}
@@ -95,11 +96,11 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
vm.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.numInstance = 0;
PositionPdx.numInstance = 0;
PositionPdx.cnt = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
@@ -305,15 +306,15 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
public static class TestObject2 implements PdxSerializable {
public int _id;
- public static int numInstance = 0;
+ public static AtomicInteger numInstance = new AtomicInteger();
public TestObject2() {
- numInstance++;
+ numInstance.incrementAndGet();
}
public TestObject2(int id) {
this._id = id;
- numInstance++;
+ numInstance.incrementAndGet();
}
public int getId() {
@@ -358,7 +359,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
public int important;
public int selection;
public int select;
- public static int numInstance = 0;
+ public static final AtomicInteger numInstance = new AtomicInteger();
public Map idTickers = new HashMap();
public HashMap positions = new HashMap();
public TestObject2 test;
@@ -367,7 +368,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
if (log != null) {
log.info("TestObject ctor stack trace", new Exception());
}
- numInstance++;
+ numInstance.incrementAndGet();
}
public TestObject(int id, String ticker) {
@@ -380,7 +381,7 @@ public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
this.important = id;
this.selection = id;
this.select = id;
- numInstance++;
+ numInstance.incrementAndGet();
idTickers.put(id + "", ticker);
this.test = new TestObject2(id);
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
index f85e333..43149bf 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
@@ -898,7 +898,7 @@ public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
protected final void preTearDownPDXQueryTestBase() throws Exception {
disconnectAllFromDS(); // tests all expect to create a new ds
// Reset the testObject numinstance for the next test.
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.DEBUG = false;
// In all VM.
resetTestObjectInstanceCount();
@@ -916,11 +916,11 @@ public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
vm.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PortfolioPdx.numInstance = 0;
PositionPdx.numInstance = 0;
PositionPdx.cnt = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java
new file mode 100644
index 0000000..6e1bc7e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.SerializationException;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.query.FunctionDomainException;
+import org.apache.geode.cache.query.NameResolutionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryInvocationTargetException;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.TypeMismatchException;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxSerializationException;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.version.VersionManager;
+import org.apache.geode.util.internal.GeodeGlossary;
+
+@Category({OQLQueryTest.class})
+public class PdxMultiThreadQueryDUnitTest extends PDXQueryTestBase {
+ public static final Logger logger = LogService.getLogger();
+ static final int numberOfEntries = 100;
+ final String poolName = "testClientServerQueryPool";
+ final String hostName = NetworkUtils.getServerHostName();
+ private VM server0;
+ private VM server1;
+ private VM server2;
+ private VM client;
+ private int port0;
+ private int port1;
+ private int port2;
+
+ public PdxMultiThreadQueryDUnitTest() {
+ super();
+ }
+
+ @Before
+ public void startUpServersAndClient() {
+ final Host host = Host.getHost(0);
+
+ server0 = host.getVM(VersionManager.CURRENT_VERSION, 0);
+ server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+ server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+ client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+ // Start servers
+ for (VM vm : Arrays.asList(server0, server1, server2)) {
+ vm.invoke((SerializableRunnableIF) this::configAndStartBridgeServer);
+ }
+
+ port0 = server0.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ port1 = server1.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ port2 = server2.invoke(PdxQueryDUnitTest::getCacheServerPort);
+ }
+
+ @After
+ public void closeServersAndClient() {
+ closeClient(client);
+ closeClient(server2);
+ closeClient(server1);
+ closeClient(server0);
+ }
+
+ @Test
+ public void testClientServerQuery() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObject(i, "vmware"));
+ }
+ });
+
+ // Create client region
+ client.invoke(() -> {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(hostName, port1);
+ cf.addPoolServer(hostName, port2);
+ cf.addPoolServer(hostName, port0);
+ cf.setPdxReadSerialized(false);
+ ClientCache clientCache = getClientCache(cf);
+ Region<Object, Object> region =
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+
+ logger.info("### Executing Query on server: " + queryString[1] + ": from client region: "
+ + region.getFullPath());
+ final int size = 100;
+ IntStream.range(0, size).parallel().forEach(a -> {
+ try {
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ region.query(queryString[1]);
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ } catch (FunctionDomainException | TypeMismatchException | NameResolutionException
+ | QueryInvocationTargetException e) {
+ fail("Unexpected query exception:" + e.getMessage());
+ }
+ });
+ await().until(() -> TestObject.numInstance.get() == size * numberOfEntries);
+ });
+ }
+
+ @Test
+ public void testClientServerQueryUsingRemoteQueryService()
+ throws CacheException, InterruptedException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObject(i, "vmware"));
+ }
+ });
+
+ // Create client pool.
+ createPool(client, poolName, new String[] {hostName, hostName, hostName},
+ new int[] {port0, port1, port2}, true);
+
+ final int size = 100;
+ AsyncInvocation[] asyncInvocationArray = new AsyncInvocation[size];
+ for (int i = 0; i < size; i++) {
+ asyncInvocationArray[i] =
+ client.invokeAsync(() -> {
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ (SelectResults) query.execute();
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ });
+ }
+
+ for (int i = 0; i < size; i++) {
+ asyncInvocationArray[i].await();
+ }
+ client
+ .invoke(() -> await().until(() -> TestObject.numInstance.get() == size * numberOfEntries));
+ }
+
+ @Test
+ public void testRetrySucceedWithPdxSerializationException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with 3 servers
+ createPool(client, poolName, new String[] {hostName, hostName, hostName},
+ new int[] {port0, port1, port2}, true);
+
+ client.invoke(() -> {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ try {
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ SelectResults<TestObjectThrowsPdxSerializationException> selectResults =
+ (SelectResults) query.execute();
+ assertThat(selectResults.size()).isEqualTo(numberOfEntries);
+ // the 2 failed try incremented numInstance
+ assertThat(numberOfEntries + 2)
+ .isEqualTo(TestObjectThrowsPdxSerializationException.numInstance.get());
+ } finally {
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isFalse();
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ @Test
+ public void testRetryNotEnabledBySystemProperty() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ // If the client did not explicitly specify GeodeGlossary.GEMFIRE_PREFIX +
+ // "enableQueryRetryOnPdxSerializationException" to true, retry will not be enabled
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class)
+ .hasCauseInstanceOf(SerializationException.class);
+ assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(1);
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isTrue();
+ } finally {
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = false;
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ }
+ });
+ }
+
+ @Test
+ public void testNotToRetryOnRegularSerializationException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ TestObjectThrowsSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class)
+ .hasCauseInstanceOf(SerializationException.class);
+ assertThat(TestObjectThrowsSerializationException.numInstance.get()).isEqualTo(1);
+ assertThat(TestObjectThrowsSerializationException.throwExceptionOnDeserialization)
+ .isTrue();
+ } finally {
+ TestObjectThrowsSerializationException.throwExceptionOnDeserialization = false;
+ TestObjectThrowsSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ @Test
+ public void testRetryFailedWithServerConnectivityException() throws CacheException {
+ // create pdx instance at servers
+ server0.invoke(() -> {
+ Region<Object, Object> region = getRootRegion().getSubregion(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put("key-" + i, new TestObjectThrowsPdxSerializationException());
+ }
+ });
+
+ // Create client pool with only 2 servers to test that retry will not run forever
+ createPool(client, poolName, new String[] {hostName, hostName},
+ new int[] {port0, port1}, true);
+
+ client.invoke(() -> {
+ try {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true");
+ TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true;
+ QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ logger.info("### Executing Query on server: " + queryString[1]);
+ Query query = remoteQueryService.newQuery(queryString[1]);
+ assertThatThrownBy(query::execute).isInstanceOf(ServerConnectivityException.class);
+ assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(2);
+ assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization)
+ .isFalse();
+ } finally {
+ TestObjectThrowsPdxSerializationException.numInstance.set(0);
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false");
+ }
+ });
+ }
+
+ public static class TestObjectThrowsPdxSerializationException implements PdxSerializable {
+ private static boolean throwExceptionOnDeserialization = false;
+ public static AtomicInteger numInstance = new AtomicInteger();
+
+ public TestObjectThrowsPdxSerializationException() {
+ numInstance.incrementAndGet();
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {}
+
+ @Override
+ public void fromData(PdxReader reader) {
+ if (throwExceptionOnDeserialization) {
+ if (numInstance.get() >= 2) {
+ // after retried 2 servers, let the retry to 3rd server succeed
+ throwExceptionOnDeserialization = false;
+ }
+ throw new PdxSerializationException("Deserialization is expected to fail in this VM");
+ }
+ }
+ }
+
+ public static class TestObjectThrowsSerializationException implements PdxSerializable {
+ private static boolean throwExceptionOnDeserialization = false;
+ public static AtomicInteger numInstance = new AtomicInteger();
+
+ public TestObjectThrowsSerializationException() {
+ numInstance.incrementAndGet();
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {}
+
+ @Override
+ public void fromData(PdxReader reader) {
+ if (throwExceptionOnDeserialization) {
+ if (numInstance.get() >= 2) {
+ // after retried 2 servers, let the retry to 3rd server succeed
+ throwExceptionOnDeserialization = false;
+ }
+ throw new SerializationException("Deserialization is expected to fail in this VM");
+ }
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
index 9494b77..d19d0a5 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
@@ -112,7 +112,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
public void run2() throws CacheException {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
// Execute query with different type of Results.
QueryService qs = getCache().getQueryService();
@@ -139,7 +139,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
// Pdx objects for local queries now get deserialized when results are iterated.
// So the deserialized objects are no longer cached in VMCachedDeserializable.
- assertEquals(numberOfEntries * 2, TestObject.numInstance);
+ assertEquals(numberOfEntries * 2, TestObject.numInstance.get());
}
});
@@ -187,7 +187,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -231,7 +231,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -279,7 +279,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -327,7 +327,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -379,7 +379,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
configAndStartBridgeServer();
Region region = getRootRegion().getSubregion(regionName);
System.out.println("##### Region size is: " + region.size());
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -425,7 +425,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -549,7 +549,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(2 * numberOfEntries, TestObject.numInstance);
+ assertEquals(2 * numberOfEntries, TestObject.numInstance.get());
}
};
@@ -560,7 +560,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -855,7 +855,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
Assert.fail("Failed executing " + queryStr, e);
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
};
@@ -866,7 +866,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1145,7 +1145,8 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(2 * (numberOfEntries + 5), (TestObject.numInstance + TestObject2.numInstance));
+ assertEquals(2 * (numberOfEntries + 5),
+ (TestObject.numInstance.get() + TestObject2.numInstance.get()));
}
};
@@ -1156,7 +1157,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1242,7 +1243,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
Assert.fail("Failed executing " + queryString[i], e);
}
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1250,7 +1251,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1259,7 +1260,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1268,7 +1269,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1298,7 +1299,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
}
- if (TestObject.numInstance <= 0) {
+ if (TestObject.numInstance.get() <= 0) {
fail("Expected TestObject instance to be >= 0.");
}
}
@@ -1309,7 +1310,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm2.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1619,7 +1620,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1639,7 +1640,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
} catch (Exception ex) {
fail("Unable to create index. " + ex.getMessage());
}
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1679,14 +1680,14 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1727,7 +1728,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
}
}
- assertEquals(4 * numberOfEntries, TestObject.numInstance);
+ assertEquals(4 * numberOfEntries, TestObject.numInstance.get());
for (int i = 3; i < queryString.length; i++) {
try {
@@ -1759,7 +1760,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm0.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(numberOfEntries, TestObject.numInstance);
+ assertEquals(numberOfEntries, TestObject.numInstance.get());
}
});
@@ -1768,7 +1769,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1888,7 +1889,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -1916,7 +1917,7 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
- assertEquals(0, TestObject.numInstance);
+ assertEquals(0, TestObject.numInstance.get());
}
});
@@ -2556,14 +2557,14 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
vm1.invoke(new CacheSerializableRunnable("validate") {
@Override
public void run2() throws CacheException {
- assertEquals(testObjectCnt, TestObject.numInstance);
+ assertEquals(testObjectCnt, TestObject.numInstance.get());
assertEquals(positionObjectCnt, PositionPdx.numInstance);
- assertEquals(testObjCnt, TestObject2.numInstance);
+ assertEquals(testObjCnt, TestObject2.numInstance.get());
// Reset the instances
- TestObject.numInstance = 0;
+ TestObject.numInstance.set(0);
PositionPdx.numInstance = 0;
- TestObject2.numInstance = 0;
+ TestObject2.numInstance.set(0);
}
});
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
index c7dc14d..bcdd7f3 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.client.internal;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.geode.SerializationException;
@@ -31,6 +32,9 @@ import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ObjectPartList;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.pdx.PdxSerializationException;
+import org.apache.geode.util.internal.GeodeGlossary;
/**
* Does a region query on a server
@@ -122,7 +126,29 @@ public class QueryOp {
queryResult = resultPart.getObject();
} catch (Exception e) {
String s = "While deserializing " + getOpName() + " result";
- exceptionRef[0] = new SerializationException(s, e);
+
+ // Enable the workaround to convert PdxSerializationException into IOException to retry.
+ // It only worked when the client is configured to connect to more than one cache server
+ // AND the pool's "retry-attempts" is -1 or > 0.
+ // It is possible that if application closed the current connection and got a new
+ // connection to the same server and retried the query to it, that it would also
+ // workaround this issue and it would not have the limitations of needing multiple
+ // servers and would not depend on the retry-attempts configuration.
+ boolean enableQueryRetryOnPdxSerializationException = Boolean.getBoolean(
+ GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException");
+ if (e instanceof PdxSerializationException
+ && enableQueryRetryOnPdxSerializationException) {
+ // IOException will allow the client to retry next server in the connection pool until
+ // exhausted all the servers (so it will not retry forever). Why retry:
+ // The byte array of the pdxInstance is always the same at the server. Other clients
+ // can get a correct one from query response message. Even this client can get it
+ // correctly before and after the PdxSerializationException.
+ exceptionRef[0] = new IOException(s, e);
+ LogService.getLogger().warn(
+ "Encountered unexpected PdxSerializationException, retrying on another server");
+ } else {
+ exceptionRef[0] = new SerializationException(s, e);
+ }
return;
}
if (queryResult instanceof Throwable) {