You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/12/30 16:20:55 UTC
geode git commit: GEODE-2257 Client configured to use locator with
addPoolServer fails to connect
Repository: geode
Updated Branches:
refs/heads/develop 17577a6f5 -> e3cb1b747
GEODE-2257 Client configured to use locator with addPoolServer fails to connect
The first byte that a client sends is a connection-type that is >= 100.
The first bytes expected by a locator are the 4 bytes of an integer
indicating the protocol version. I've changed the locator to read
the first byte and, if it's >= 100 send a reply byte back to the client
telling it that it's trying to contact a locator using a client/server
handshake.
The client has a new reply code that the locator is now using. If the
client sees this reply code it will throw a GemFireConfigException. Some
of these exceptions will be thrown in the background and get logged but
the thread initiating cache creation will also get this exception when
it invokes ClientCacheFactory.create().
The client-side error message will be in this form:
_Improperly configured client detected. Server at 10.154.30.28 is actually
a locator. Use addPoolLocator to configure locators_.
The locator will also log a warning in this form so that alerts will be
raised:
_Unable to process request from 10.118.33.195 exception=Improperly
configured client detected - use addPoolLocator to configure its
locators instead of addPoolServer_.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e3cb1b74
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e3cb1b74
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e3cb1b74
Branch: refs/heads/develop
Commit: e3cb1b74710fc94b8e100f6031e0ea8f2c48b5f4
Parents: 17577a6
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Dec 30 08:17:26 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Dec 30 08:18:41 2016 -0800
----------------------------------------------------------------------
.../client/internal/ConnectionFactoryImpl.java | 3 +
.../cache/client/internal/ConnectionImpl.java | 2 +-
.../cache/client/internal/QueueManagerImpl.java | 3 +
.../geode/internal/cache/tier/Acceptor.java | 5 ++
.../cache/tier/sockets/CacheClientUpdater.java | 2 +-
.../internal/cache/tier/sockets/HandShake.java | 92 ++++++++------------
.../tier/sockets/ClientServerMiscDUnitTest.java | 29 +++---
7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
index eceabcb..a419d57 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
+import org.apache.geode.GemFireConfigException;
import org.apache.geode.cache.GatewayConfigurationException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.internal.ServerBlackList.FailureTracker;
@@ -139,6 +140,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory {
connection.setHandShake(connHandShake);
authenticateIfRequired(connection);
initialized = true;
+ } catch (GemFireConfigException e) {
+ throw e;
} catch (CancelException e) {
// propagate this up, don't retry
throw e;
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index 6bc68ae..a494138 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -103,7 +103,7 @@ public class ConnectionImpl implements Connection {
theSocket.setSoTimeout(handShakeTimeout);
out = theSocket.getOutputStream();
in = theSocket.getInputStream();
- this.status = handShake.greet(this, location, communicationMode);
+ this.status = handShake.handshakeWithServer(this, location, communicationMode);
commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
if (sender != null) {
commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 4712268..965ee57 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.geode.GemFireConfigException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -466,6 +467,8 @@ public class QueueManagerImpl implements QueueManager {
connection = factory.createClientToServerConnection(server, true);
} catch (GemFireSecurityException e) {
throw e;
+ } catch (GemFireConfigException e) {
+ throw e;
} catch (Exception e) {
if (isDebugEnabled) {
logger.debug("SubscriptionManager - Error connected to server: {}", server, e);
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 0454f53..9a3241b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -26,6 +26,10 @@ import org.apache.geode.internal.Version;
* @since GemFire 2.0.2
*/
public abstract class Acceptor {
+
+ // The following are communications "mode" bytes sent as the first byte of a
+ // client/server handshake. They must not be larger than 1 byte
+
/**
* Byte meaning that the Socket is being used for 'client to server' communication.
*/
@@ -67,6 +71,7 @@ public abstract class Acceptor {
*/
public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107;
+
/**
* The GFE version of the server.
*
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index b4a6bed..f85ecb4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -300,7 +300,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
mySock.getInetAddress().getHostAddress(), mySock.getLocalPort(), mySock.getPort());
}
- ServerQueueStatus sqs = handshake.greetNotifier(mySock, this.isPrimary);
+ ServerQueueStatus sqs = handshake.handshakeWithSubscriptionFeed(mySock, this.isPrimary);
if (sqs.isPrimary() || sqs.isNonRedundant()) {
PoolImpl pool = (PoolImpl) this.qManager.getPool();
if (!pool.getReadyForEventsCalled()) {
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 9a5c6c6..6e119c0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -60,6 +60,7 @@ import javax.net.ssl.SSLSocket;
import org.apache.geode.CancelCriterion;
import org.apache.geode.DataSerializer;
+import org.apache.geode.GemFireConfigException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.GatewayConfigurationException;
import org.apache.geode.cache.client.PoolFactory;
@@ -116,6 +117,8 @@ public class HandShake implements ClientHandShake {
protected static final byte REPLY_AUTH_NOT_REQUIRED = (byte) 66;
+ public static final byte REPLY_SERVER_IS_LOCATOR = (byte) 67;
+
private static SecurityService securityService = IntegratedSecurityService.getSecurityService();
private byte code;
@@ -124,11 +127,6 @@ public class HandShake implements ClientHandShake {
private boolean isRead = false;
protected final DistributedSystem system;
- /** Singleton for client side */
- // Client has no more a singleton handShake instance. Now each connection will
- // have its own handShake instance.
- private static HandShake handshake;
-
final protected ClientProxyMembershipID id;
private Properties credentials;
@@ -251,7 +249,9 @@ public class HandShake implements ClientHandShake {
id = null;
}
- /** Constructor used by server side connection */
+ /**
+ * HandShake Constructor used by server side connection
+ */
public HandShake(Socket sock, int timeout, DistributedSystem sys, Version clientVersion,
byte communicationMode) throws IOException, AuthenticationRequiredException {
this.clientVersion = clientVersion;
@@ -325,12 +325,9 @@ public class HandShake implements ClientHandShake {
return this.clientVersion;
}
- // private void initSingleton(DistributedSystem sys) {
- // id = ClientProxyMembershipID.getNewProxyMembership();
- // this.system = sys;
- // this.code = REPLY_OK;
- // }
-
+ /**
+ * Client-side handshake. This form of HandShake can communicate with a server
+ */
public HandShake(ClientProxyMembershipID id, DistributedSystem sys) {
this.id = id;
this.code = REPLY_OK;
@@ -343,6 +340,9 @@ public class HandShake implements ClientHandShake {
this.id.updateID(idm);
}
+ /**
+ * Clone a HandShake to be used in creating other connections
+ */
public HandShake(HandShake handShake) {
this.appSecureMode = handShake.appSecureMode;
this.clientConflation = handShake.clientConflation;
@@ -362,19 +362,6 @@ public class HandShake implements ClientHandShake {
this._encrypt = null;
}
- /*
- * private void read(DataInputStream dis,byte[] toFill) throws IOException { /* this.code = (byte)
- * in.read(); if (this.code == -1) { throw new
- * IOException(LocalizedStrings.HandShake_HANDSHAKE_READ_AT_END_OF_STREAM.toLocalizedString()); }
- * if (this.code != REPLY_OK) { throw new
- * IOException(LocalizedStrings.HandShake_HANDSHAKE_REPLY_CODE_IS_NOT_OK.toLocalizedString()); }
- * try { ObjectInput in = new ObjectInputStream(is); this.id =
- * ClientProxyMembershipID.readCanonicalized(in); } catch(IOException ioe) { this.code = -2; throw
- * ioe; } catch(ClassNotFoundException cnfe) { this.code = -3; IOException e = new
- * IOException(LocalizedStrings.HandShake_ERROR_DESERIALIZING_HANDSHAKE.toLocalizedString());
- * e.initCause(cnfe); throw e; } } finally { synchronized (this) { this.isRead = true; } } }
- */
-
// used by the client side
private byte setClientConflation() {
byte result = CONFLATION_DEFAULT;
@@ -455,7 +442,10 @@ public class HandShake implements ClientHandShake {
}
}
- public byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode,
+ /**
+ * client-to-server handshake. Nothing is sent to the server prior to invoking this method.
+ */
+ private byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode,
int replyCode, int readTimeout, List ports, Properties p_credentials,
DistributedMember member, boolean isCallbackConnection) throws IOException {
HeapDataOutputStream hdos = new HeapDataOutputStream(32, Version.CURRENT);
@@ -529,17 +519,7 @@ public class HandShake implements ClientHandShake {
}
/**
- *
* This assumes that authentication is the last piece of info in handshake
- *
- * @param dos
- * @param dis
- * @param p_credentials
- * @param isNotification
- * @param member
- * @param heapdos stream to append data to.
- * @throws IOException
- * @throws GemFireSecurityException
*/
public void writeCredentials(DataOutputStream dos, DataInputStream dis, Properties p_credentials,
boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos)
@@ -666,15 +646,6 @@ public class HandShake implements ClientHandShake {
* This method writes what readCredential() method expects to read. (Note the use of singular
* credential). It is similar to writeCredentials(), except that it doesn't write
* credential-properties.
- *
- * @param dos
- * @param dis
- * @param authInit
- * @param isNotification
- * @param member
- * @param heapdos
- * @throws IOException
- * @throws GemFireSecurityException
*/
public byte writeCredential(DataOutputStream dos, DataInputStream dis, String authInit,
boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos)
@@ -1201,20 +1172,23 @@ public class HandShake implements ClientHandShake {
* @param sock the socket this handshake is operating on
* @return temporary id to reprent the other vm
*/
- private DistributedMember getDistributedMember(Socket sock) {
+ private DistributedMember getIDForSocket(Socket sock) {
return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false);
}
- public ServerQueueStatus greet(Connection conn, ServerLocation location, byte communicationMode)
- throws IOException, AuthenticationRequiredException, AuthenticationFailedException,
- ServerRefusedConnectionException {
+ /**
+ * Client-side handshake with a Server
+ */
+ public ServerQueueStatus handshakeWithServer(Connection conn, ServerLocation location,
+ byte communicationMode) throws IOException, AuthenticationRequiredException,
+ AuthenticationFailedException, ServerRefusedConnectionException {
try {
ServerQueueStatus serverQStatus = null;
Socket sock = conn.getSocket();
DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
final InputStream in = sock.getInputStream();
DataInputStream dis = new DataInputStream(in);
- DistributedMember member = getDistributedMember(sock);
+ DistributedMember member = getIDForSocket(sock);
// if running in a loner system, use the new port number in the ID to
// help differentiate from other clients
DM dm = ((InternalDistributedSystem) this.system).getDistributionManager();
@@ -1246,6 +1220,10 @@ public class HandShake implements ClientHandShake {
throw new AuthenticationRequiredException(
LocalizedStrings.HandShake_SERVER_EXPECTING_SSL_CONNECTION.toLocalizedString());
}
+ if (acceptanceCode == REPLY_SERVER_IS_LOCATOR) {
+ throw new GemFireConfigException("Improperly configured client detected. " + "Server at "
+ + location + " is actually a locator. Use addPoolLocator to configure locators.");
+ }
// Successful handshake for GATEWAY_TO_GATEWAY mode sets the peer version in connection
if (communicationMode == Acceptor.GATEWAY_TO_GATEWAY
@@ -1309,7 +1287,11 @@ public class HandShake implements ClientHandShake {
}
}
- public ServerQueueStatus greetNotifier(Socket sock, boolean isPrimary)
+ /**
+ * Used by client-side CacheClientUpdater to handshake with a server in order to receive messages
+ * generated by subscriptions (register-interest, continuous query)
+ */
+ public ServerQueueStatus handshakeWithSubscriptionFeed(Socket sock, boolean isPrimary)
throws IOException, AuthenticationRequiredException, AuthenticationFailedException,
ServerRefusedConnectionException, ClassNotFoundException {
ServerQueueStatus sqs = null;
@@ -1317,7 +1299,7 @@ public class HandShake implements ClientHandShake {
DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
final InputStream in = sock.getInputStream();
DataInputStream dis = new DataInputStream(in);
- DistributedMember member = getDistributedMember(sock);
+ DistributedMember member = getIDForSocket(sock);
if (!this.multiuserSecureMode) {
this.credentials = getCredentials(member);
}
@@ -1466,12 +1448,6 @@ public class HandShake implements ClientHandShake {
return false;
final HandShake that = (HandShake) other;
- /*
- * if (identity != null && identity.length > 0) { for (int i = 0; i < identity.length; i++) { if
- * (this.identity[i] != that.identity[i]) return false; } } if (this.code != that.code) return
- * false; return true;
- */
-
if (this.id.isSameDSMember(that.id) && this.code == that.code) {
return true;
} else {
http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index 82f30bf..391653c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -21,6 +21,10 @@ import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -743,6 +747,15 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
}
}
+ @Test(expected = GemFireConfigException.class)
+ public void clientIsPreventedFromConnectingToLocatorAsServer() throws Exception {
+ IgnoredException.addIgnoredException("Improperly configured client detected");
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.addPoolServer("localhost", DistributedTestUtils.getDUnitLocatorPort());
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ getClientCache(clientCacheFactory);
+ }
+
private void createCache(Properties props) throws Exception {
createCacheV(props);
@@ -1317,22 +1330,6 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
}
}
- @Override
- public final void postTearDownCacheTestCase() throws Exception {
- // close the clients first
- closeCacheAndDisconnect();
- // then close the servers
- server1.invoke(() -> ClientServerMiscDUnitTest.closeCacheAndDisconnect());
- }
-
- public static void closeCacheAndDisconnect() {
- Cache cache = new ClientServerMiscDUnitTest().getCache();
- if (cache != null && !cache.isClosed()) {
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
-
/**
* set the boolean for starting the dispatcher thread a bit later to FALSE. This is just a
* precaution in case any test set it to true and did not unset it on completion.