You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/01/28 01:05:37 UTC
[geode] 06/16: GEODE-8870: Removes GFE_61.
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 01812dfe4b5c40d67ed0c28b42e7d0458e3fb61a
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu Jan 21 18:55:31 2021 -0800
GEODE-8870: Removes GFE_61.
---
.../client/internal/ClientSideHandshakeImpl.java | 11 +-
.../cache/tier/sockets/ClientHealthMonitor.java | 2 +-
.../tier/sockets/ClientUpdateMessageImpl.java | 166 ---------------------
.../cache/tier/sockets/MessageDispatcher.java | 7 +-
.../tier/sockets/ServerSideHandshakeImpl.java | 2 +-
.../cache/tier/sockets/SocketMessageWriter.java | 67 ++++-----
.../cache/tier/sockets/ServerConnectionTest.java | 22 ---
.../geode/internal/serialization/KnownVersion.java | 7 -
8 files changed, 39 insertions(+), 245 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
index 8a17041..eb5fcf9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
@@ -229,9 +229,7 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand
readMessage(dis, dos, acceptanceCode, member);
// Read delta-propagation property value from server.
- // [sumedh] Static variable below? Client can connect to different
- // DSes with different values of this. It shoule be a member variable.
- if (!communicationMode.isWAN() && currentClientVersion.isNotOlderThan(KnownVersion.GFE_61)) {
+ if (!communicationMode.isWAN()) {
((InternalDistributedSystem) system).setDeltaEnabledOnServer(dis.readBoolean());
}
@@ -319,13 +317,6 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand
// Read the message (if any)
readMessage(dis, dos, acceptanceCode, member);
- // nothing more to be done for older clients used in tests
- // there is a difference in serializer map registration for >= 6.5.1.6
- // clients but that is not used in tests
- if (currentClientVersion.isOlderThan(KnownVersion.GFE_61)) {
- return new ServerQueueStatus(endpointType, queueSize, member);
- }
-
final Map<Integer, List<String>> instantiatorMap = DataSerializer.readHashMap(dis);
for (final Map.Entry<Integer, List<String>> entry : instantiatorMap.entrySet()) {
final Integer id = entry.getKey();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 45e6cc4..70a1897 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -674,7 +674,7 @@ public class ClientHealthMonitor {
}
public boolean hasDeltaClients() {
- return getNumberOfClientsAtOrAboveVersion(KnownVersion.GFE_61) > 0;
+ return getNumberOfClientsAtOrAboveVersion(KnownVersion.OLDEST) > 0;
}
private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 11cc3e8..2e33fde 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -375,8 +375,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
message = getGFE70Message(proxy, serializedValue, conflation, clientVersion);
} else if (clientVersion.isNotOlderThan(KnownVersion.GFE_65)) {
message = getGFE65Message(proxy, serializedValue, conflation, clientVersion);
- } else if (clientVersion.isNotOlderThan(KnownVersion.GFE_61)) {
- message = getGFE61Message(proxy, serializedValue, conflation, clientVersion);
} else {
throw new IOException(
"Unsupported client version for server-to-client message creation: " + clientVersion);
@@ -385,170 +383,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
return message;
}
- private Message getGFE61Message(CacheClientProxy proxy, byte[] latestValue, boolean conflation,
- KnownVersion clientVersion) throws IOException {
- Message message;
- ClientProxyMembershipID proxyId = proxy.getProxyID();
-
- // Add CQ info.
- int cqMsgParts = 0;
- boolean clientHasCq = _hasCqs && (getCqs(proxyId) != null);
-
- if (clientHasCq) {
- cqMsgParts = (getCqs(proxyId).length * 2) + 1;
- }
-
- if (isCreate() || isUpdate()) {
- // Create or update event
- if (_clientInterestListInv != null && _clientInterestListInv.contains(proxyId)) {
- // Notify all - do not send the value
- message = new Message(6, clientVersion);
- message.setMessageType(MessageType.LOCAL_INVALIDATE);
-
- // Add the region name
- message.addStringPart(_regionName, true);
-
- // Add the key
- // Currently serializing the key here instead of when the message
- // is put in the queue so that it can be conflated it later
- message.addStringOrObjPart(_keyOfInterest);
-
- // Add the callback argument
- message.addObjPart(_callbackArgument);
-
- // Add interestlist status.
- message.addObjPart(isClientInterested(proxyId));
-
- // Add CQ status.
- message.addObjPart(Boolean.FALSE);
-
- } else {
- boolean isClientInterested = isClientInterested(proxyId);
- // Notify by subscription - send the value
- message = new Message(8 + cqMsgParts, clientVersion);
-
- // Set message type
- if (isCreate()) {
- message.setMessageType(MessageType.LOCAL_CREATE);
-
- // Add the region name
- message.addStringPart(_regionName, true);
-
- // Add the key
- // Currently serializing the key here instead of when the message
- // is put in the queue so that it can be conflated it later
- message.addStringOrObjPart(_keyOfInterest);
-
- message.addObjPart(Boolean.FALSE); // NO delta
- // Add the value (which has already been serialized)
- message.addRawPart(latestValue, (_valueIsObject == 0x01));
- } else {
- message.setMessageType(MessageType.LOCAL_UPDATE);
-
- // Add the region name
- message.addStringPart(_regionName, true);
-
- // Add the key
- // Currently serializing the key here instead of when the message
- // is put in the queue so that it can be conflated it later
- message.addStringOrObjPart(_keyOfInterest);
-
- if (deltaBytes != null && !conflation && !proxy.isMarkerEnqueued()
- && !proxy.getRegionsWithEmptyDataPolicy().containsKey(_regionName)) {
- message.addObjPart(Boolean.TRUE);
- message.addBytesPart(deltaBytes);
- proxy.getStatistics().incDeltaMessagesSent();
- } else {
- message.addObjPart(Boolean.FALSE);
- byte[] l = latestValue;
- if (l == null) {
- if (!(_value instanceof byte[])) {
- _value = CacheServerHelper.serialize(_value);
- }
- l = (byte[]) _value;
- }
- // Add the value (which has already been serialized)
- message.addRawPart(l, (_valueIsObject == 0x01));
- }
- }
-
- // Add the callback argument
- message.addObjPart(_callbackArgument);
-
- // Add interest list status.
- message.addObjPart(isClientInterested);
-
- // Add CQ status.
- message.addObjPart(clientHasCq);
-
- if (clientHasCq) {
- addCqsToMessage(proxyId, message);
- }
- }
- } else if (isDestroy() || isInvalidate()) {
- // Destroy or invalidate event
- message = new Message(6 + cqMsgParts, clientVersion);
-
- if (isDestroy()) {
- message.setMessageType(MessageType.LOCAL_DESTROY);
- } else {
- message.setMessageType(MessageType.LOCAL_INVALIDATE);
- }
-
- message.addStringPart(_regionName, true);
-
- // Currently serializing the key here instead of when the message
- // is put in the queue so that it can be conflated later
- message.addStringOrObjPart(_keyOfInterest);
-
- message.addObjPart(_callbackArgument);
- message.addObjPart(isClientInterested(proxyId));
- message.addObjPart(clientHasCq);
-
- if (clientHasCq) {
- addCqsToMessage(proxyId, message);
- }
- } else if (isDestroyRegion()) {
- message = new Message(4 + cqMsgParts, clientVersion);
- message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
- message.addStringPart(_regionName, true);
- message.addObjPart(_callbackArgument);
- message.addObjPart(clientHasCq);
-
- if (clientHasCq) {
- addCqsToMessage(proxyId, message);
- }
- } else if (isClearRegion()) {
- message = new Message(4 + cqMsgParts, clientVersion);
- message.setMessageType(MessageType.CLEAR_REGION);
- message.addStringPart(_regionName, true);
- message.addObjPart(_callbackArgument);
- message.addObjPart(clientHasCq);
-
- if (clientHasCq) {
- addCqsToMessage(proxyId, message);
- }
- } else if (isInvalidateRegion()) {
- message = new Message(4 + cqMsgParts, clientVersion);
- message.setMessageType(MessageType.INVALIDATE_REGION);
- message.addStringPart(_regionName, true);
- message.addObjPart(_callbackArgument);
- message.addObjPart(clientHasCq);
-
- if (clientHasCq) {
- addCqsToMessage(proxyId, message);
- }
- } else {
- throw new InternalGemFireError("Don't know what kind of message");
- }
-
- message.setTransactionId(0);
- // Add the EventId since 5.1 (used to prevent duplicate events
- // received on the client side after a failover)
- message.addObjPart(_eventIdentifier);
- return message;
- }
-
private Message getGFE65Message(CacheClientProxy proxy, byte[] p_latestValue,
boolean conflation, KnownVersion clientVersion) throws IOException {
byte[] latestValue = p_latestValue;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 2fb63d2..c1c9cf3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -44,7 +44,6 @@ import org.apache.geode.internal.cache.ha.HARegionQueueAttributes;
import org.apache.geode.internal.cache.ha.HARegionQueueStats;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
-import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -137,9 +136,9 @@ public class MessageDispatcher extends LoggingThread {
((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
boolean createDurableQueue = proxy.proxyID.isDurable();
- boolean canHandleDelta = (proxy.getClientVersion().isNotOlderThan(KnownVersion.GFE_61))
- && InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
- && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
+ boolean canHandleDelta =
+ InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
+ && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
logger.debug("Creating a {} subscription queue for {}",
createDurableQueue ? "durable" : "non-durable",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
index 89f1e23..0e5055e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
@@ -169,7 +169,7 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand
dos.writeUTF("");
// Write delta-propagation property value if this is not WAN.
- if (!communicationMode.isWAN() && clientVersion.isNotOlderThan(KnownVersion.GFE_61)) {
+ if (!communicationMode.isWAN()) {
dos.writeBoolean(((InternalDistributedSystem) system).getConfig().getDeltaPropagation());
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
index 4a4a8bb..89bdc8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java
@@ -45,45 +45,44 @@ public class SocketMessageWriter {
msg = "";
}
dos.writeUTF(msg);
- if (clientVersion != null && clientVersion.isNotOlderThan(KnownVersion.GFE_61)) {
- // get all the instantiators.
- Instantiator[] instantiators = InternalInstantiator.getInstantiators();
- HashMap instantiatorMap = new HashMap();
- if (instantiators != null && instantiators.length > 0) {
- for (Instantiator instantiator : instantiators) {
- ArrayList instantiatorAttributes = new ArrayList();
- instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
- instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
- instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
- }
+
+ // get all the instantiators.
+ Instantiator[] instantiators = InternalInstantiator.getInstantiators();
+ HashMap instantiatorMap = new HashMap();
+ if (instantiators != null && instantiators.length > 0) {
+ for (Instantiator instantiator : instantiators) {
+ ArrayList instantiatorAttributes = new ArrayList();
+ instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
+ instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
+ instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
}
- DataSerializer.writeHashMap(instantiatorMap, dos);
+ }
+ DataSerializer.writeHashMap(instantiatorMap, dos);
- // get all the dataserializers.
- DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
- HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
- new HashMap<Integer, ArrayList<String>>();
- HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>();
- if (dataSerializers != null && dataSerializers.length > 0) {
- for (DataSerializer dataSerializer : dataSerializers) {
- dataSerializersMap.put(dataSerializer.getId(),
- dataSerializer.getClass().toString().substring(6));
- if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) {
- ArrayList<String> supportedClassNames = new ArrayList<String>();
- for (Class clazz : dataSerializer.getSupportedClasses()) {
- supportedClassNames.add(clazz.getName());
- }
- dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames);
+ // get all the dataserializers.
+ DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
+ HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
+ new HashMap<Integer, ArrayList<String>>();
+ HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>();
+ if (dataSerializers != null && dataSerializers.length > 0) {
+ for (DataSerializer dataSerializer : dataSerializers) {
+ dataSerializersMap.put(dataSerializer.getId(),
+ dataSerializer.getClass().toString().substring(6));
+ if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) {
+ ArrayList<String> supportedClassNames = new ArrayList<String>();
+ for (Class clazz : dataSerializer.getSupportedClasses()) {
+ supportedClassNames.add(clazz.getName());
}
+ dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames);
}
}
- DataSerializer.writeHashMap(dataSerializersMap, dos);
- if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) {
- DataSerializer.writeHashMap(dsToSupportedClasses, dos);
- }
- if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) {
- dos.writeInt(CLIENT_PING_TASK_PERIOD);
- }
+ }
+ DataSerializer.writeHashMap(dataSerializersMap, dos);
+ if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) {
+ DataSerializer.writeHashMap(dsToSupportedClasses, dos);
+ }
+ if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) {
+ dos.writeInt(CLIENT_PING_TASK_PERIOD);
}
dos.flush();
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index 792dbf0..c5cde15 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -91,28 +91,6 @@ public class ServerConnectionTest {
}
@Test
- public void pre65SecureShouldReturnUserAuthId() {
- long userAuthId = 12345L;
- when(handshake.getVersion()).thenReturn(KnownVersion.GFE_61);
- serverConnection.setUserAuthId(userAuthId);
-
- long value = serverConnection.getUniqueId();
-
- assertThat(value).isEqualTo(userAuthId);
- }
-
- @Test
- public void pre65NonSecureShouldReturnUserAuthId() {
- when(handshake.getVersion()).thenReturn(KnownVersion.GFE_61);
- long userAuthId = 12345L;
- serverConnection.setUserAuthId(userAuthId);
-
- long value = serverConnection.getUniqueId();
-
- assertThat(value).isEqualTo(userAuthId);
- }
-
- @Test
public void post65SecureShouldUseUniqueIdFromMessage() {
long uniqueIdFromMessage = 23456L;
MessageIdExtractor messageIdExtractor = mock(MessageIdExtractor.class);
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java
index fe41527..d88b3b7 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java
@@ -62,13 +62,6 @@ public class KnownVersion extends AbstractVersion {
public static final KnownVersion TOKEN =
new KnownVersion("", "TOKEN", (byte) -1, (byte) 0, (byte) 0, (byte) 0, TOKEN_ORDINAL);
- private static final short GFE_61_ORDINAL = 5;
-
- @Immutable
- @Deprecated
- public static final KnownVersion GFE_61 =
- new KnownVersion("GFE", "6.1", (byte) 6, (byte) 1, (byte) 0, (byte) 0, GFE_61_ORDINAL);
-
private static final short GFE_65_ORDINAL = 6;
@Immutable