You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pi...@apache.org on 2018/03/02 19:43:14 UTC

[geode] branch develop updated: GEODE-4401: Add disconnect client message to server and driver. (#1525)

This is an automated email from the ASF dual-hosted git repository.

pivotalsarge pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b9d9b38  GEODE-4401: Add disconnect client message to server and driver. (#1525)
b9d9b38 is described below

commit b9d9b38e42686a1973cee51f89053eb8b2519ec4
Author: Michael "Sarge" Dodge <md...@pivotal.io>
AuthorDate: Fri Mar 2 11:43:12 2018 -0800

    GEODE-4401: Add disconnect client message to server and driver. (#1525)
    
    * GEODE-4401: Add disconnect client message to server and driver.
    
    * GEODE-4401: Address review comments.
    
    * GEODE-4401: Update the driver to use the channel instead of a socket.
---
 .../geode/experimental/driver/ProtobufDriver.java  |  29 +++--
 .../experimental/driver/DriverConnectionTest.java  |   6 -
 .../src/main/proto/v1/clientProtocol.proto         |   3 +
 .../src/main/proto/v1/connection_API.proto         |   8 ++
 .../DisconnectClientRequestOperationHandler.java   |  43 +++++++
 .../registry/ProtobufOperationContextRegistry.java |   8 +-
 .../v1/DisconnectClientIntegrationTest.java        | 137 +++++++++++++++++++++
 7 files changed, 218 insertions(+), 16 deletions(-)

diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index 40e435c..9b233f7 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -15,20 +15,16 @@
 package org.apache.geode.experimental.driver;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
-import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
-import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRequest;
 
@@ -84,9 +80,24 @@ public class ProtobufDriver implements Driver {
   @Override
   public void close() {
     try {
-      this.channel.close();
-    } catch (IOException e) {
-      // ignore
+      final Message disconnectClientRequest = ClientProtocol.Message.newBuilder()
+          .setDisconnectClientRequest(
+              ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Driver closed"))
+          .build();
+      final ConnectionAPI.DisconnectClientResponse disconnectClientResponse =
+          channel.sendRequest(disconnectClientRequest, MessageTypeCase.DISCONNECTCLIENTRESPONSE)
+              .getDisconnectClientResponse();
+      if (Objects.isNull(disconnectClientResponse)) {
+        // The server did not acknowledge the disconnect request; ignore for now.
+      }
+    } catch (IOException ioe) {
+      // NOP
+    } finally {
+      try {
+        this.channel.close();
+      } catch (IOException e) {
+        // ignore
+      }
     }
   }
 
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
index c840d9e..6e03607 100644
--- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
@@ -15,9 +15,7 @@
 package org.apache.geode.experimental.driver;
 
 import static org.apache.geode.internal.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -31,11 +29,9 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
-import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
@@ -99,6 +95,4 @@ public class DriverConnectionTest {
     driver.close();
     assertFalse(driver.isConnected());
   }
-
-
 }
diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 184d6c4..22ab5c4 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -72,6 +72,9 @@ message Message {
 
         KeySetRequest keySetRequest = 28;
         KeySetResponse keySetResponse = 29;
+
+        DisconnectClientRequest disconnectClientRequest = 30;
+        DisconnectClientResponse disconnectClientResponse = 31;
     }
 }
 
diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
index 7c4435e..66e2cdd 100644
--- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
@@ -23,3 +23,11 @@ message AuthenticationRequest {
 message AuthenticationResponse {
     bool authenticated = 1;
 }
+
+message DisconnectClientRequest {
+    string reason = 1;
+}
+
+message DisconnectClientResponse {
+    // message presence indicates success.
+}
\ No newline at end of file
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java
new file mode 100644
index 0000000..fa0f8f5
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionTerminatingStateProcessor;
+
+public class DisconnectClientRequestOperationHandler implements
+    ProtobufOperationHandler<ConnectionAPI.DisconnectClientRequest, ConnectionAPI.DisconnectClientResponse> {
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public Result<ConnectionAPI.DisconnectClientResponse> process(
+      ProtobufSerializationService serializationService,
+      ConnectionAPI.DisconnectClientRequest request,
+      MessageExecutionContext messageExecutionContext) {
+    logger.info("Client disconnecting due to {}", request.getReason());
+    messageExecutionContext
+        .setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor());
+
+    return Success.of(ConnectionAPI.DisconnectClientResponse.newBuilder().build());
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 3bca6fe..13861d1 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -23,7 +23,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
-import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.operations.DisconnectClientRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
@@ -72,6 +72,12 @@ public class ProtobufOperationContextRegistry {
             opsResp -> ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp),
             this::skipAuthorizationCheck));
 
+    operationContexts.put(ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTREQUEST,
+        new ProtobufOperationContext<>(ClientProtocol.Message::getDisconnectClientRequest,
+            new DisconnectClientRequestOperationHandler(),
+            opsResp -> ClientProtocol.Message.newBuilder().setDisconnectClientResponse(opsResp),
+            this::skipAuthorizationCheck));
+
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetRequest,
             new GetRequestOperationHandler(),
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java
new file mode 100644
index 0000000..5b47c8f
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class DisconnectClientIntegrationTest {
+  public static final String SECURITY_PRINCIPAL = "principle";
+  private Socket socket;
+  private Cache cache;
+  private SecurityManager securityManager;
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  @Before
+  public void setUp() throws Exception {
+    CacheFactory cacheFactory = new CacheFactory(new Properties());
+    cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+    cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
+    cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+
+    securityManager = mock(SecurityManager.class);
+    cacheFactory.setSecurityManager(securityManager);
+    when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL);
+    when(securityManager.authorize(eq(SECURITY_PRINCIPAL), any())).thenReturn(true);
+
+    cache = cacheFactory.create();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+    RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.PARTITION);
+
+
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+
+    socket = new Socket("localhost", cacheServerPort);
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+
+    MessageUtil.performAndVerifyHandshake(socket);
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+    try {
+      socket.close();
+    } catch (IOException ignore) {
+      // NOP
+    }
+  }
+
+  @Test
+  public void disconnectsFromServer() throws Exception {
+    authenticateWithServer();
+
+    final ClientProtocol.Message requestMessage = createRequestMessageBuilder(
+        ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Normal termination")).build();
+
+    final ClientProtocol.Message responseMessage = writeMessage(requestMessage);
+    assertEquals(responseMessage.toString(),
+        ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTRESPONSE,
+        responseMessage.getMessageTypeCase());
+    final ConnectionAPI.DisconnectClientResponse disconnectClientResponse =
+        responseMessage.getDisconnectClientResponse();
+    assertNotNull(disconnectClientResponse);
+  }
+
+  private void authenticateWithServer() throws IOException {
+    ClientProtocol.Message.Builder request = ClientProtocol.Message.newBuilder()
+        .setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
+            .putCredentials(ResourceConstants.USER_NAME, "someuser")
+            .putCredentials(ResourceConstants.PASSWORD, "somepassword"));
+
+    ClientProtocol.Message response = writeMessage(request.build());
+    assertTrue(response.getAuthenticationResponse().getAuthenticated());
+  }
+
+  private ClientProtocol.Message.Builder createRequestMessageBuilder(
+      ConnectionAPI.DisconnectClientRequest.Builder disconnectClientRequest) {
+    return ClientProtocol.Message.newBuilder().setDisconnectClientRequest(disconnectClientRequest);
+  }
+
+  private ClientProtocol.Message writeMessage(ClientProtocol.Message request) throws IOException {
+    request.writeDelimitedTo(socket.getOutputStream());
+
+    return ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
pivotalsarge@apache.org.