You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/03/08 17:56:54 UTC

[geode] 01/02: GEODE-8886 fix (#6059)

This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9b26445521d86a985d140eceaf160ea6c60feae1
Author: mhansonp <ha...@vmware.com>
AuthorDate: Tue Mar 2 14:26:36 2021 -0800

    GEODE-8886 fix (#6059)
    
    Getting rid of a write and a read in the wrong place...
    sanctionedDataSerializables.txt needed updating.
    New test for encoding and decoding of packets.
    New test for backwards compatibility
    
    (cherry picked from commit dcb1096130925e2794010c7a7ab8accfbf49cb3e)
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |  4 +-
 .../internal/cache/wan/GatewaySenderAdvisor.java   |  2 -
 .../cache/wan/GatewaySenderAdvisorTest.java        | 93 ++++++++++++++++++++++
 ...ANRollingUpgradeVerifyGatewaySenderProfile.java | 58 ++++++++++++--
 4 files changed, 146 insertions(+), 11 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 058733c..03e7c5b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1912,10 +1912,10 @@ toData,254
 
 org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6
 fromData,17
-fromDataPre_GEODE_1_14_0_0,293
+fromDataPre_GEODE_1_14_0_0,283
 fromDataPre_GFE_8_0_0_0,188
 toData,17
-toDataPre_GEODE_1_14_0_0,281
+toDataPre_GEODE_1_14_0_0,271
 toDataPre_GFE_8_0_0_0,236
 
 org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index 6af0866..38b608c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -595,7 +595,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
         this.serverLocation = new ServerLocation();
         InternalDataSerializer.invokeFromData(this.serverLocation, in);
       }
-      this.enforceThreadsConnectSameReceiver = in.readBoolean();
     }
 
     @Override
@@ -641,7 +640,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
       if (serverLocationFound) {
         InternalDataSerializer.invokeToData(serverLocation, out);
       }
-      out.writeBoolean(enforceThreadsConnectSameReceiver);
     }
 
     public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java
new file mode 100644
index 0000000..d130e93
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.internal.serialization.VersionedDataInputStream;
+import org.apache.geode.internal.serialization.VersionedDataOutputStream;
+import org.apache.geode.internal.tcp.ByteBufferInputStream;
+
+public class GatewaySenderAdvisorTest {
+
+  @Before
+  public void setUp() throws Exception {}
+
+  @After
+  public void tearDown() throws Exception {}
+
+
+  @Test
+  public void testGatewaySenderProfileSerializeAndDeserializeCurrent()
+      throws IOException, ClassNotFoundException {
+    InternalDistributedMember internalDistributedMember =
+        new InternalDistributedMember("localhost", 8888);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile =
+        new GatewaySenderAdvisor.GatewaySenderProfile(internalDistributedMember, 1);
+
+
+    VersionedDataOutputStream versionedDataOutputStream =
+        new VersionedDataOutputStream(byteArrayOutputStream, KnownVersion.CURRENT);
+    DataSerializer.writeObject(gatewaySenderProfile, versionedDataOutputStream);
+    versionedDataOutputStream.flush();
+
+    ByteBuffer bb = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(bb);
+
+    VersionedDataInputStream versionedDataInputStream =
+        new VersionedDataInputStream(byteBufferInputStream, KnownVersion.CURRENT);
+    GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile2 =
+        DataSerializer.readObject(versionedDataInputStream);
+    assertThat(gatewaySenderProfile).isEqualTo(gatewaySenderProfile2);
+  }
+
+
+  @Test
+  public void testGatewaySenderProfileSerializeAndDeserialize113()
+      throws IOException, ClassNotFoundException {
+    InternalDistributedMember internalDistributedMember =
+        new InternalDistributedMember("localhost", 8888);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile =
+        new GatewaySenderAdvisor.GatewaySenderProfile(internalDistributedMember, 1);
+
+
+    VersionedDataOutputStream versionedDataOutputStream =
+        new VersionedDataOutputStream(byteArrayOutputStream, KnownVersion.GEODE_1_13_0);
+    DataSerializer.writeObject(gatewaySenderProfile, versionedDataOutputStream);
+    versionedDataOutputStream.flush();
+
+    ByteBuffer bb = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(bb);
+
+    VersionedDataInputStream versionedDataInputStream =
+        new VersionedDataInputStream(byteBufferInputStream, KnownVersion.GEODE_1_13_0);
+    GatewaySenderAdvisor.GatewaySenderProfile gatewaySenderProfile2 =
+        DataSerializer.readObject(versionedDataInputStream);
+    assertThat(gatewaySenderProfile).isEqualTo(gatewaySenderProfile2);
+  }
+}
diff --git a/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java b/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
index 8d70ec7..02d3603 100644
--- a/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
+++ b/geode-wan/src/upgradeTest/java/org/apache/geode/cache/wan/WANRollingUpgradeVerifyGatewaySenderProfile.java
@@ -23,25 +23,23 @@ import org.junit.Test;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.test.dunit.DistributedTestUtils;
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.version.VersionManager;
 
 public class WANRollingUpgradeVerifyGatewaySenderProfile extends WANRollingUpgradeDUnitTest {
   @Test
+
   // This test verifies that a GatewaySenderProfile serializes properly between versions.
   public void testVerifyGatewaySenderProfile() {
-    final Host host = Host.getHost(0);
-    VM oldLocator = host.getVM(oldVersion, 0);
-    VM oldServer = host.getVM(oldVersion, 1);
-    VM currentServer = host.getVM(VersionManager.CURRENT_VERSION, 2);
+    VM oldLocator = VM.getVM(oldVersion, 0);
+    VM oldServer = VM.getVM(oldVersion, 1);
+    VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2);
 
     // Start locator
     final int port = getRandomAvailableTCPPort();
     oldLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
-    final String locators = NetworkUtils.getServerHostName(host) + "[" + port + "]";
+    final String locators = VM.getHostName() + "[" + port + "]";
     oldLocator.invoke(() -> startLocator(port, 0, locators, ""));
 
     IgnoredException ie =
@@ -73,4 +71,50 @@ public class WANRollingUpgradeVerifyGatewaySenderProfile extends WANRollingUpgra
       ie.remove();
     }
   }
+
+  @Test
+  // This test verifies that a GatewaySenderProfile serializes properly between versions.
+  public void testOldServerCanUnderstandNewGatewaySenderProfile() {
+
+    VM oldLocator = VM.getVM(oldVersion, 0);
+    VM oldServer = VM.getVM(oldVersion, 1);
+    VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2);
+
+    // Start locator
+    final int port = getRandomAvailableTCPPort();
+    oldLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
+    final String locators = VM.getHostName() + "[" + port + "]";
+    oldLocator.invoke(() -> startLocator(port, 0, locators, ""));
+
+    IgnoredException ie =
+        IgnoredException.addIgnoredException("could not get remote locator information");
+    try {
+      String senderId = getName() + "_gatewaysender";
+
+      // Start current server
+      currentServer.invoke(() -> createCache(locators));
+
+      // Create GatewaySender in new server
+      currentServer.invoke(() -> createGatewaySender(senderId, 10,
+          ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL));
+
+      // Locators before 1.4 handled configuration asynchronously.
+      // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+      oldLocator.invoke(
+          () -> await()
+              .untilAsserted(() -> assertTrue(
+                  !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+                      || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+      // Start old server
+      oldServer.invoke(() -> createCache(locators));
+
+      // Attempt to create GatewaySender in old server
+      oldServer.invoke(() -> createGatewaySender(senderId, 10,
+          ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL));
+
+    } finally {
+      ie.remove();
+    }
+  }
 }