You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/03/08 17:56:54 UTC
[geode] 01/02: GEODE-8886 fix (#6059)
This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9b26445521d86a985d140eceaf160ea6c60feae1
Author: mhansonp <ha...@vmware.com>
AuthorDate: Tue Mar 2 14:26:36 2021 -0800
GEODE-8886 fix (#6059)
Getting rid of a write and a read in the wrong place...
sanctionedDataSerializables.txt needed updating.
New test for encoding and decoding of packets.
New test for backwards compatibility
(cherry picked from commit dcb1096130925e2794010c7a7ab8accfbf49cb3e)
---
.../codeAnalysis/sanctionedDataSerializables.txt | 4 +-
.../internal/cache/wan/GatewaySenderAdvisor.java | 2 -
.../cache/wan/GatewaySenderAdvisorTest.java | 93 ++++++++++++++++++++++
...ANRollingUpgradeVerifyGatewaySenderProfile.java | 58 ++++++++++++--
4 files changed, 146 insertions(+), 11 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 058733c..03e7c5b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1912,10 +1912,10 @@ toData,254
org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6
fromData,17
-fromDataPre_GEODE_1_14_0_0,293
+fromDataPre_GEODE_1_14_0_0,283
fromDataPre_GFE_8_0_0_0,188
toData,17
-toDataPre_GEODE_1_14_0_0,281
+toDataPre_GEODE_1_14_0_0,271
toDataPre_GFE_8_0_0_0,236
org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index 6af0866..38b608c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -595,7 +595,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
this.serverLocation = new ServerLocation();
InternalDataSerializer.invokeFromData(this.serverLocation, in);
}
- this.enforceThreadsConnectSameReceiver = in.readBoolean();
}
@Override
@@ -641,7 +640,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (serverLocationFound) {
InternalDataSerializer.invokeToData(serverLocation, out);
}
- out.writeBoolean(enforceThreadsConnectSameReceiver);
}
public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java
new file mode 100644
index 0000000..d130e93
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.internal.serialization.VersionedDataInputStream;
+import org.apache.geode.internal.serialization.VersionedDataOutputStream;
+import org.apache.geode.internal.tcp.ByteBufferInputStream;
+
+public class GatewaySenderAdvisorTest {
+
+ @Before
+ public void setUp() throws Exception {}
+
+ @After
+ public void tearDown() throws Exception {}
+
+
+ @Test
+ public void testGatewaySenderProfileSerializeAndDeserializeCurrent()
+ throws IOException, ClassNotFoundException {
+ InternalDistributedMember internalDistributedMember =
+ new InternalDistributedMember("localhost", 8888);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile =
+ new GatewaySenderAdvisor.GatewaySenderProfile(internalDistributedMember, 1);
+
+
+ VersionedDataOutputStream versionedDataOutputStream =
+ new VersionedDataOutputStream(byteArrayOutputStream, KnownVersion.CURRENT);
+ DataSerializer.writeObject(gatewaySenderProfile, versionedDataOutputStream);
+ versionedDataOutputStream.flush();
+
+ ByteBuffer bb = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(bb);
+
+ VersionedDataInputStream versionedDataInputStream =
+ new VersionedDataInputStream(byteBufferInputStream, KnownVersion.CURRENT);
+ GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile2 =
+ DataSerializer.readObject(versionedDataInputStream);
+ assertThat(gatewaySenderProfile).isEqualTo(gatewaySenderProfile2);
+ }
+
+
+ @Test
+ public void testGatewaySenderProfileSerializeAndDeserialize113()
+ throws IOException, ClassNotFoundException {
+ InternalDistributedMember internalDistributedMember =
+ new InternalDistributedMember("localhost", 8888);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile =
+ new GatewaySenderAdvisor.GatewaySenderProfile(internalDistributedMember, 1);
+
+
+ VersionedDataOutputStream versionedDataOutputStream =
+ new VersionedDataOutputStream(byteArrayOutputStream, KnownVersion.GEODE_1_13_0);
+ DataSerializer.writeObject(gatewaySenderProfile, versionedDataOutputStream);
+ versionedDataOutputStream.flush();
+
+ ByteBuffer bb = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(bb);
+
+ VersionedDataInputStream versionedDataInputStream =
+ new VersionedDataInputStream(byteBufferInputStream, KnownVersion.GEODE_1_13_0);
+ GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile2 =
+ DataSerializer.readObject(versionedDataInputStream);
+ assertThat(gatewaySenderProfile).isEqualTo(gatewaySenderProfile2);
+ }
+}
diff --git a/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java b/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
index 8d70ec7..02d3603 100644
--- a/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
+++ b/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
@@ -23,25 +23,23 @@ import org.junit.Test;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.test.dunit.DistributedTestUtils;
-import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.version.VersionManager;
public class WANRollingUpgradeVerifyGatewaySenderProfile extends WANRollingUpgradeDUnitTest {
@Test
+
// This test verifies that a GatewaySenderProfile serializes properly between versions.
public void testVerifyGatewaySenderProfile() {
- final Host host = Host.getHost(0);
- VM oldLocator = host.getVM(oldVersion, 0);
- VM oldServer = host.getVM(oldVersion, 1);
- VM currentServer = host.getVM(VersionManager.CURRENT_VERSION, 2);
+ VM oldLocator = VM.getVM(oldVersion, 0);
+ VM oldServer = VM.getVM(oldVersion, 1);
+ VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2);
// Start locator
final int port = getRandomAvailableTCPPort();
oldLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
- final String locators = NetworkUtils.getServerHostName(host) + "[" + port + "]";
+ final String locators = VM.getHostName() + "[" + port + "]";
oldLocator.invoke(() -> startLocator(port, 0, locators, ""));
IgnoredException ie =
@@ -73,4 +71,50 @@ public class WANRollingUpgradeVerifyGatewaySenderProfile extends WANRollingUpgra
ie.remove();
}
}
+
+ @Test
+ // This test verifies that a GatewaySenderProfile serializes properly between versions.
+ public void testOldServerCanUnderstandNewGatewaySenderProfile() {
+
+ VM oldLocator = VM.getVM(oldVersion, 0);
+ VM oldServer = VM.getVM(oldVersion, 1);
+ VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2);
+
+ // Start locator
+ final int port = getRandomAvailableTCPPort();
+ oldLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
+ final String locators = VM.getHostName() + "[" + port + "]";
+ oldLocator.invoke(() -> startLocator(port, 0, locators, ""));
+
+ IgnoredException ie =
+ IgnoredException.addIgnoredException("could not get remote locator information");
+ try {
+ String senderId = getName() + "_gatewaysender";
+
+ // Start current server
+ currentServer.invoke(() -> createCache(locators));
+
+ // Create GatewaySender in new server
+ currentServer.invoke(() -> createGatewaySender(senderId, 10,
+ ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL));
+
+ // Locators before 1.4 handled configuration asynchronously.
+ // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+ oldLocator.invoke(
+ () -> await()
+ .untilAsserted(() -> assertTrue(
+ !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+ || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+ // Start old server
+ oldServer.invoke(() -> createCache(locators));
+
+ // Attempt to create GatewaySender in old server
+ oldServer.invoke(() -> createGatewaySender(senderId, 10,
+ ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL));
+
+ } finally {
+ ie.remove();
+ }
+ }
}