You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/12/30 16:20:55 UTC

geode git commit: GEODE-2257 Client configured to use locator with addPoolServer fails to connect

Repository: geode
Updated Branches:
  refs/heads/develop 17577a6f5 -> e3cb1b747


GEODE-2257 Client configured to use locator with addPoolServer fails to connect

The first byte that a client sends is a connection-type that is >= 100.
The first bytes expected by a locator are the 4 bytes of an integer
indicating the protocol version.  I've changed the locator to read
the first byte and, if it's >= 100 send a reply byte back to the client
telling it that it's trying to contact a locator using a client/server
handshake.

The client has a new reply code that the locator is now using.  If the
client sees this reply code it will throw a GemFireConfigException.  Some
of these exceptions will be thrown in the background and get logged but
the thread initiating cache creation will also get this exception when
it invokes ClientCacheFactory.create().

The client-side error message will be in this form:
_Improperly configured client detected. Server at 10.154.30.28 is actually
a locator.  Use addPoolLocator to configure locators_.

The locator will also log a warning in this form so that alerts will be
raised:
_Unable to process request from 10.118.33.195 exception=Improperly
configured client detected - use addPoolLocator to configure its
locators instead of addPoolServer_.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e3cb1b74
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e3cb1b74
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e3cb1b74

Branch: refs/heads/develop
Commit: e3cb1b74710fc94b8e100f6031e0ea8f2c48b5f4
Parents: 17577a6
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Dec 30 08:17:26 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Dec 30 08:18:41 2016 -0800

----------------------------------------------------------------------
 .../client/internal/ConnectionFactoryImpl.java  |  3 +
 .../cache/client/internal/ConnectionImpl.java   |  2 +-
 .../cache/client/internal/QueueManagerImpl.java |  3 +
 .../geode/internal/cache/tier/Acceptor.java     |  5 ++
 .../cache/tier/sockets/CacheClientUpdater.java  |  2 +-
 .../internal/cache/tier/sockets/HandShake.java  | 92 ++++++++------------
 .../tier/sockets/ClientServerMiscDUnitTest.java | 29 +++---
 7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
index eceabcb..a419d57 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
+import org.apache.geode.GemFireConfigException;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.client.internal.ServerBlackList.FailureTracker;
@@ -139,6 +140,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory {
       connection.setHandShake(connHandShake);
       authenticateIfRequired(connection);
       initialized = true;
+    } catch (GemFireConfigException e) {
+      throw e;
     } catch (CancelException e) {
       // propagate this up, don't retry
       throw e;

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index 6bc68ae..a494138 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -103,7 +103,7 @@ public class ConnectionImpl implements Connection {
     theSocket.setSoTimeout(handShakeTimeout);
     out = theSocket.getOutputStream();
     in = theSocket.getInputStream();
-    this.status = handShake.greet(this, location, communicationMode);
+    this.status = handShake.handshakeWithServer(this, location, communicationMode);
     commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
     if (sender != null) {
       commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 4712268..965ee57 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.GemFireConfigException;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -466,6 +467,8 @@ public class QueueManagerImpl implements QueueManager {
         connection = factory.createClientToServerConnection(server, true);
       } catch (GemFireSecurityException e) {
         throw e;
+      } catch (GemFireConfigException e) {
+        throw e;
       } catch (Exception e) {
         if (isDebugEnabled) {
           logger.debug("SubscriptionManager - Error connected to server: {}", server, e);

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 0454f53..9a3241b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -26,6 +26,10 @@ import org.apache.geode.internal.Version;
  * @since GemFire 2.0.2
  */
 public abstract class Acceptor {
+
+  // The following are communications "mode" bytes sent as the first byte of a
+  // client/server handshake. They must not be larger than 1 byte
+
   /**
    * Byte meaning that the Socket is being used for 'client to server' communication.
    */
@@ -67,6 +71,7 @@ public abstract class Acceptor {
    */
   public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107;
 
+
   /**
    * The GFE version of the server.
    * 

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index b4a6bed..f85ecb4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -300,7 +300,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             mySock.getInetAddress().getHostAddress(), mySock.getLocalPort(), mySock.getPort());
       }
 
-      ServerQueueStatus sqs = handshake.greetNotifier(mySock, this.isPrimary);
+      ServerQueueStatus sqs = handshake.handshakeWithSubscriptionFeed(mySock, this.isPrimary);
       if (sqs.isPrimary() || sqs.isNonRedundant()) {
         PoolImpl pool = (PoolImpl) this.qManager.getPool();
         if (!pool.getReadyForEventsCalled()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 9a5c6c6..6e119c0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -60,6 +60,7 @@ import javax.net.ssl.SSLSocket;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.PoolFactory;
@@ -116,6 +117,8 @@ public class HandShake implements ClientHandShake {
 
   protected static final byte REPLY_AUTH_NOT_REQUIRED = (byte) 66;
 
+  public static final byte REPLY_SERVER_IS_LOCATOR = (byte) 67;
+
   private static SecurityService securityService = IntegratedSecurityService.getSecurityService();
 
   private byte code;
@@ -124,11 +127,6 @@ public class HandShake implements ClientHandShake {
   private boolean isRead = false;
   protected final DistributedSystem system;
 
-  /** Singleton for client side */
-  // Client has no more a singleton handShake instance. Now each connection will
-  // have its own handShake instance.
-  private static HandShake handshake;
-
   final protected ClientProxyMembershipID id;
 
   private Properties credentials;
@@ -251,7 +249,9 @@ public class HandShake implements ClientHandShake {
     id = null;
   }
 
-  /** Constructor used by server side connection */
+  /**
+   * HandShake Constructor used by server side connection
+   */
   public HandShake(Socket sock, int timeout, DistributedSystem sys, Version clientVersion,
       byte communicationMode) throws IOException, AuthenticationRequiredException {
     this.clientVersion = clientVersion;
@@ -325,12 +325,9 @@ public class HandShake implements ClientHandShake {
     return this.clientVersion;
   }
 
-  // private void initSingleton(DistributedSystem sys) {
-  // id = ClientProxyMembershipID.getNewProxyMembership();
-  // this.system = sys;
-  // this.code = REPLY_OK;
-  // }
-
+  /**
+   * Client-side handshake. This form of HandShake can communicate with a server
+   */
   public HandShake(ClientProxyMembershipID id, DistributedSystem sys) {
     this.id = id;
     this.code = REPLY_OK;
@@ -343,6 +340,9 @@ public class HandShake implements ClientHandShake {
     this.id.updateID(idm);
   }
 
+  /**
+   * Clone a HandShake to be used in creating other connections
+   */
   public HandShake(HandShake handShake) {
     this.appSecureMode = handShake.appSecureMode;
     this.clientConflation = handShake.clientConflation;
@@ -362,19 +362,6 @@ public class HandShake implements ClientHandShake {
     this._encrypt = null;
   }
 
-  /*
-   * private void read(DataInputStream dis,byte[] toFill) throws IOException { /* this.code = (byte)
-   * in.read(); if (this.code == -1) { throw new
-   * IOException(LocalizedStrings.HandShake_HANDSHAKE_READ_AT_END_OF_STREAM.toLocalizedString()); }
-   * if (this.code != REPLY_OK) { throw new
-   * IOException(LocalizedStrings.HandShake_HANDSHAKE_REPLY_CODE_IS_NOT_OK.toLocalizedString()); }
-   * try { ObjectInput in = new ObjectInputStream(is); this.id =
-   * ClientProxyMembershipID.readCanonicalized(in); } catch(IOException ioe) { this.code = -2; throw
-   * ioe; } catch(ClassNotFoundException cnfe) { this.code = -3; IOException e = new
-   * IOException(LocalizedStrings.HandShake_ERROR_DESERIALIZING_HANDSHAKE.toLocalizedString());
-   * e.initCause(cnfe); throw e; } } finally { synchronized (this) { this.isRead = true; } } }
-   */
-
   // used by the client side
   private byte setClientConflation() {
     byte result = CONFLATION_DEFAULT;
@@ -455,7 +442,10 @@ public class HandShake implements ClientHandShake {
     }
   }
 
-  public byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode,
+  /**
+   * client-to-server handshake. Nothing is sent to the server prior to invoking this method.
+   */
+  private byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode,
       int replyCode, int readTimeout, List ports, Properties p_credentials,
       DistributedMember member, boolean isCallbackConnection) throws IOException {
     HeapDataOutputStream hdos = new HeapDataOutputStream(32, Version.CURRENT);
@@ -529,17 +519,7 @@ public class HandShake implements ClientHandShake {
   }
 
   /**
-   * 
    * This assumes that authentication is the last piece of info in handshake
-   * 
-   * @param dos
-   * @param dis
-   * @param p_credentials
-   * @param isNotification
-   * @param member
-   * @param heapdos stream to append data to.
-   * @throws IOException
-   * @throws GemFireSecurityException
    */
   public void writeCredentials(DataOutputStream dos, DataInputStream dis, Properties p_credentials,
       boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos)
@@ -666,15 +646,6 @@ public class HandShake implements ClientHandShake {
    * This method writes what readCredential() method expects to read. (Note the use of singular
    * credential). It is similar to writeCredentials(), except that it doesn't write
    * credential-properties.
-   * 
-   * @param dos
-   * @param dis
-   * @param authInit
-   * @param isNotification
-   * @param member
-   * @param heapdos
-   * @throws IOException
-   * @throws GemFireSecurityException
    */
   public byte writeCredential(DataOutputStream dos, DataInputStream dis, String authInit,
       boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos)
@@ -1201,20 +1172,23 @@ public class HandShake implements ClientHandShake {
    * @param sock the socket this handshake is operating on
    * @return temporary id to reprent the other vm
    */
-  private DistributedMember getDistributedMember(Socket sock) {
+  private DistributedMember getIDForSocket(Socket sock) {
     return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false);
   }
 
-  public ServerQueueStatus greet(Connection conn, ServerLocation location, byte communicationMode)
-      throws IOException, AuthenticationRequiredException, AuthenticationFailedException,
-      ServerRefusedConnectionException {
+  /**
+   * Client-side handshake with a Server
+   */
+  public ServerQueueStatus handshakeWithServer(Connection conn, ServerLocation location,
+      byte communicationMode) throws IOException, AuthenticationRequiredException,
+      AuthenticationFailedException, ServerRefusedConnectionException {
     try {
       ServerQueueStatus serverQStatus = null;
       Socket sock = conn.getSocket();
       DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
       final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
-      DistributedMember member = getDistributedMember(sock);
+      DistributedMember member = getIDForSocket(sock);
       // if running in a loner system, use the new port number in the ID to
       // help differentiate from other clients
       DM dm = ((InternalDistributedSystem) this.system).getDistributionManager();
@@ -1246,6 +1220,10 @@ public class HandShake implements ClientHandShake {
         throw new AuthenticationRequiredException(
             LocalizedStrings.HandShake_SERVER_EXPECTING_SSL_CONNECTION.toLocalizedString());
       }
+      if (acceptanceCode == REPLY_SERVER_IS_LOCATOR) {
+        throw new GemFireConfigException("Improperly configured client detected.  " + "Server at "
+            + location + " is actually a locator.  Use addPoolLocator to configure locators.");
+      }
 
       // Successful handshake for GATEWAY_TO_GATEWAY mode sets the peer version in connection
       if (communicationMode == Acceptor.GATEWAY_TO_GATEWAY
@@ -1309,7 +1287,11 @@ public class HandShake implements ClientHandShake {
     }
   }
 
-  public ServerQueueStatus greetNotifier(Socket sock, boolean isPrimary)
+  /**
+   * Used by client-side CacheClientUpdater to handshake with a server in order to receive messages
+   * generated by subscriptions (register-interest, continuous query)
+   */
+  public ServerQueueStatus handshakeWithSubscriptionFeed(Socket sock, boolean isPrimary)
       throws IOException, AuthenticationRequiredException, AuthenticationFailedException,
       ServerRefusedConnectionException, ClassNotFoundException {
     ServerQueueStatus sqs = null;
@@ -1317,7 +1299,7 @@ public class HandShake implements ClientHandShake {
       DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
       final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
-      DistributedMember member = getDistributedMember(sock);
+      DistributedMember member = getIDForSocket(sock);
       if (!this.multiuserSecureMode) {
         this.credentials = getCredentials(member);
       }
@@ -1466,12 +1448,6 @@ public class HandShake implements ClientHandShake {
       return false;
     final HandShake that = (HandShake) other;
 
-    /*
-     * if (identity != null && identity.length > 0) { for (int i = 0; i < identity.length; i++) { if
-     * (this.identity[i] != that.identity[i]) return false; } } if (this.code != that.code) return
-     * false; return true;
-     */
-
     if (this.id.isSameDSMember(that.id) && this.code == that.code) {
       return true;
     } else {

http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index 82f30bf..391653c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -21,6 +21,10 @@ import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -743,6 +747,15 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
     }
   }
 
+  @Test(expected = GemFireConfigException.class)
+  public void clientIsPreventedFromConnectingToLocatorAsServer() throws Exception {
+    IgnoredException.addIgnoredException("Improperly configured client detected");
+    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+    clientCacheFactory.addPoolServer("localhost", DistributedTestUtils.getDUnitLocatorPort());
+    clientCacheFactory.setPoolSubscriptionEnabled(true);
+    getClientCache(clientCacheFactory);
+  }
+
 
   private void createCache(Properties props) throws Exception {
     createCacheV(props);
@@ -1317,22 +1330,6 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
     }
   }
 
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    // close the clients first
-    closeCacheAndDisconnect();
-    // then close the servers
-    server1.invoke(() -> ClientServerMiscDUnitTest.closeCacheAndDisconnect());
-  }
-
-  public static void closeCacheAndDisconnect() {
-    Cache cache = new ClientServerMiscDUnitTest().getCache();
-    if (cache != null && !cache.isClosed()) {
-      cache.close();
-      cache.getDistributedSystem().disconnect();
-    }
-  }
-
   /**
    * set the boolean for starting the dispatcher thread a bit later to FALSE. This is just a
    * precaution in case any test set it to true and did not unset it on completion.