You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/25 19:37:57 UTC
[geode] 01/01: GEODE-3637: Moved client queue initialization into
the ServerConnection.java
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5a53b68b3870c78fe6d8d9829b1d4ea4cdf439d0
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Wed Oct 25 12:37:50 2017 -0700
GEODE-3637: Moved client queue initialization into the ServerConnection.java
---
.../internal/cache/tier/sockets/AcceptorImpl.java | 21 +++---
.../cache/tier/sockets/ServerConnection.java | 77 +++++++++++++++-------
2 files changed, 65 insertions(+), 33 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 59ef466..ad910bd 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1405,6 +1405,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
return this.clientServerCnxCount.get();
}
+ public boolean isNotifyBySubscription() {
+ return notifyBySubscription;
+ }
+
protected void handleNewClientConnection(final Socket socket,
final ServerConnectionFactory serverConnectionFactory) throws IOException {
// Read the first byte. If this socket is being used for 'client to server'
@@ -1427,14 +1431,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
throw new EOFException();
}
- if (communicationMode.isSubscriptionFeed()) {
- boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
- logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
- primary ? "primary" : "secondary", socket);
- AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
- this.notifyBySubscription);
- return;
- }
+ // GEODE-3637
+ // if (communicationMode.isSubscriptionFeed()) {
+ // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+ // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+ // primary ? "primary" : "secondary", socket);
+ // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+ // this.notifyBySubscription);
+ // return;
+ // }
logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
socket);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 7fc688c..5c69769 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -72,7 +72,7 @@ import org.apache.geode.security.GemFireSecurityException;
* Provides an implementation for the server socket end of the hierarchical cache connection. Each
* server connection runs in its own thread to maximize concurrency and improve response times to
* edge requests
- *
+ *
* @since GemFire 2.0.2
*/
public abstract class ServerConnection implements Runnable {
@@ -189,16 +189,18 @@ public abstract class ServerConnection implements Runnable {
*/
private volatile int requestSpecificTimeout = -1;
- /** Tracks the id of the most recent batch to which a reply has been sent */
+ /**
+ * Tracks the id of the most recent batch to which a reply has been sent
+ */
private int latestBatchIdReplied = -1;
/*
* Uniquely identifying the client's Distributed System
*
- *
+ *
* private String membershipId;
- *
- *
+ *
+ *
* Uniquely identifying the client's ConnectionProxy object
*
*
@@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable {
// can be used.
initializeCommands();
// its initialized in verifyClientConnection call
- if (!getCommunicationMode().isWAN())
+ if (!getCommunicationMode().isWAN()) {
initializeClientUserAuths();
+ }
}
if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable {
}
}
if (unregisterClient)// last serverconnection call all close on auth objects
+ {
cleanClientAuths();
+ }
this.clientUserAuths = null;
if (needsUnregister) {
this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable {
ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
- if (retCua == null)
+ if (retCua == null) {
return cua;
+ }
return retCua;
}
@@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable {
boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
// if not successfull, try the old way
- if (!removed)
+ if (!removed) {
removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
+ }
return removed;
} catch (NullPointerException npe) {
@@ -984,7 +991,7 @@ public abstract class ServerConnection implements Runnable {
/*
* This means that client and server VMs have different security settings. The server does
* not have any security settings specified while client has.
- *
+ *
* Here, should we just ignore this and send the dummy security part (connectionId, userId)
* in the response (in this case, client needs to know that it is not expected to read any
* security part in any of the server response messages) or just throw an exception
@@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable {
throw new AuthenticationFailedException("Authentication failed");
}
-
byte[] credBytes = msg.getPart(0).getSerializedForm();
credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1066,7 +1072,7 @@ public abstract class ServerConnection implements Runnable {
/**
* MessageType of the messages (typically internal commands) which do not need to participate in
* security should be added in the following if block.
- *
+ *
* @return Part
* @see AbstractOp#processSecureBytes(Connection, Message)
* @see AbstractOp#needsUserId()
@@ -1124,9 +1130,12 @@ public abstract class ServerConnection implements Runnable {
public void run() {
setOwner();
+
if (getAcceptor().isSelector()) {
boolean finishedMsg = false;
try {
+ initializeClientNofication();
+
this.stats.decThreadQueueSize();
if (!isTerminated()) {
getAcceptor().setTLCommBuffer();
@@ -1136,9 +1145,7 @@ public abstract class ServerConnection implements Runnable {
finishedMsg = true;
}
}
- } catch (java.nio.channels.ClosedChannelException ignore) {
- // ok shutting down
- } catch (CancelException e) {
+ } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
// ok shutting down
} catch (IOException ex) {
logger.warn(
@@ -1183,10 +1190,21 @@ public abstract class ServerConnection implements Runnable {
}
}
+ private void initializeClientNofication() throws IOException {
+ if (communicationMode.isSubscriptionFeed()) {
+ boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+ logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+ primary ? "primary" : "secondary", theSocket);
+ getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
+ getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
+ }
+ }
+
/**
* If registered with a selector then this will be the key we are registered with.
*/
// private SelectionKey sKey = null;
+
/**
* Register this connection with the given selector for read events. Note that switch the channel
* to non-blocking so it can be in a selector.
@@ -1202,7 +1220,8 @@ public abstract class ServerConnection implements Runnable {
}
public void registerWithSelector2(Selector s) throws IOException {
- /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+ /* this.sKey = */
+ getSelectableChannel().register(s, SelectionKey.OP_READ, this);
}
/**
@@ -1225,7 +1244,6 @@ public abstract class ServerConnection implements Runnable {
}
/**
- *
* @return String representing the DistributedSystemMembership of the Client VM
*/
public String getMembershipID() {
@@ -1265,10 +1283,11 @@ public abstract class ServerConnection implements Runnable {
}
protected int getClientReadTimeout() {
- if (this.requestSpecificTimeout == -1)
+ if (this.requestSpecificTimeout == -1) {
return this.handshake.getClientReadTimeout();
- else
+ } else {
return this.requestSpecificTimeout;
+ }
}
protected boolean isProcessingMessage() {
@@ -1492,7 +1511,7 @@ public abstract class ServerConnection implements Runnable {
/**
* Just ensure that this class gets loaded.
- *
+ *
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
@@ -1519,7 +1538,9 @@ public abstract class ServerConnection implements Runnable {
return this.name;
}
- /** returns the name of this connection */
+ /**
+ * returns the name of this connection
+ */
public String getName() {
return this.name;
}
@@ -1736,11 +1757,13 @@ public abstract class ServerConnection implements Runnable {
// for backward client it will be store in member variable userAuthId
// for other look "requestMsg" here and get unique-id from this to get the authzrequest
- if (!AcceptorImpl.isAuthenticationRequired())
+ if (!AcceptorImpl.isAuthenticationRequired()) {
return null;
+ }
- if (AcceptorImpl.isIntegratedSecurity())
+ if (AcceptorImpl.isIntegratedSecurity()) {
return null;
+ }
long uniqueId = getUniqueId();
@@ -1768,11 +1791,13 @@ public abstract class ServerConnection implements Runnable {
public AuthorizeRequestPP getPostAuthzRequest()
throws AuthenticationRequiredException, IOException {
- if (!AcceptorImpl.isAuthenticationRequired())
+ if (!AcceptorImpl.isAuthenticationRequired()) {
return null;
+ }
- if (AcceptorImpl.isIntegratedSecurity())
+ if (AcceptorImpl.isIntegratedSecurity()) {
return null;
+ }
// look client version and return authzrequest
// for backward client it will be store in member variable userAuthId
@@ -1799,7 +1824,9 @@ public abstract class ServerConnection implements Runnable {
return postAuthReq;
}
- /** returns the member ID byte array to be used for creating EventID objects */
+ /**
+ * returns the member ID byte array to be used for creating EventID objects
+ */
public byte[] getEventMemberIDByteArray() {
return this.memberIdByteArray;
}
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.