You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pi...@apache.org on 2018/03/02 19:43:14 UTC
[geode] branch develop updated: GEODE-4401: Add disconnect client
message to server and driver. (#1525)
This is an automated email from the ASF dual-hosted git repository.
pivotalsarge pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b9d9b38 GEODE-4401: Add disconnect client message to server and driver. (#1525)
b9d9b38 is described below
commit b9d9b38e42686a1973cee51f89053eb8b2519ec4
Author: Michael "Sarge" Dodge <md...@pivotal.io>
AuthorDate: Fri Mar 2 11:43:12 2018 -0800
GEODE-4401: Add disconnect client message to server and driver. (#1525)
* GEODE-4401: Add disconnect client message to server and driver.
* GEODE-4401: Address review comments.
* GEODE-4401: Update the driver to use the channel instead of a socket.
---
.../geode/experimental/driver/ProtobufDriver.java | 29 +++--
.../experimental/driver/DriverConnectionTest.java | 6 -
.../src/main/proto/v1/clientProtocol.proto | 3 +
.../src/main/proto/v1/connection_API.proto | 8 ++
.../DisconnectClientRequestOperationHandler.java | 43 +++++++
.../registry/ProtobufOperationContextRegistry.java | 8 +-
.../v1/DisconnectClientIntegrationTest.java | 137 +++++++++++++++++++++
7 files changed, 218 insertions(+), 16 deletions(-)
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index 40e435c..9b233f7 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -15,20 +15,16 @@
package org.apache.geode.experimental.driver;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import org.apache.geode.annotations.Experimental;
-import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
-import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
-import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRequest;
@@ -84,9 +80,24 @@ public class ProtobufDriver implements Driver {
@Override
public void close() {
try {
- this.channel.close();
- } catch (IOException e) {
- // ignore
+ final Message disconnectClientRequest = ClientProtocol.Message.newBuilder()
+ .setDisconnectClientRequest(
+ ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Driver closed"))
+ .build();
+ final ConnectionAPI.DisconnectClientResponse disconnectClientResponse =
+ channel.sendRequest(disconnectClientRequest, MessageTypeCase.DISCONNECTCLIENTRESPONSE)
+ .getDisconnectClientResponse();
+ if (Objects.isNull(disconnectClientResponse)) {
+ // The server did not acknowledge the disconnect request; ignore for now.
+ }
+ } catch (IOException ioe) {
+ // NOP
+ } finally {
+ try {
+ this.channel.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
}
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
index c840d9e..6e03607 100644
--- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java
@@ -15,9 +15,7 @@
package org.apache.geode.experimental.driver;
import static org.apache.geode.internal.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Properties;
@@ -31,11 +29,9 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
-import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.test.junit.categories.IntegrationTest;
@Category(IntegrationTest.class)
@@ -99,6 +95,4 @@ public class DriverConnectionTest {
driver.close();
assertFalse(driver.isConnected());
}
-
-
}
diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 184d6c4..22ab5c4 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -72,6 +72,9 @@ message Message {
KeySetRequest keySetRequest = 28;
KeySetResponse keySetResponse = 29;
+
+ DisconnectClientRequest disconnectClientRequest = 30;
+ DisconnectClientResponse disconnectClientResponse = 31;
}
}
diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
index 7c4435e..66e2cdd 100644
--- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
@@ -23,3 +23,11 @@ message AuthenticationRequest {
message AuthenticationResponse {
bool authenticated = 1;
}
+
+message DisconnectClientRequest {
+ string reason = 1;
+}
+
+message DisconnectClientResponse {
+ // message presence indicates success.
+}
\ No newline at end of file
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java
new file mode 100644
index 0000000..fa0f8f5
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionTerminatingStateProcessor;
+
+public class DisconnectClientRequestOperationHandler implements
+ ProtobufOperationHandler<ConnectionAPI.DisconnectClientRequest, ConnectionAPI.DisconnectClientResponse> {
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public Result<ConnectionAPI.DisconnectClientResponse> process(
+ ProtobufSerializationService serializationService,
+ ConnectionAPI.DisconnectClientRequest request,
+ MessageExecutionContext messageExecutionContext) {
+ logger.info("Client disconnecting due to {}", request.getReason());
+ messageExecutionContext
+ .setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor());
+
+ return Success.of(ConnectionAPI.DisconnectClientResponse.newBuilder().build());
+ }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 3bca6fe..13861d1 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -23,7 +23,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
-import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.operations.DisconnectClientRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
@@ -72,6 +72,12 @@ public class ProtobufOperationContextRegistry {
opsResp -> ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp),
this::skipAuthorizationCheck));
+ operationContexts.put(ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTREQUEST,
+ new ProtobufOperationContext<>(ClientProtocol.Message::getDisconnectClientRequest,
+ new DisconnectClientRequestOperationHandler(),
+ opsResp -> ClientProtocol.Message.newBuilder().setDisconnectClientResponse(opsResp),
+ this::skipAuthorizationCheck));
+
operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREQUEST,
new ProtobufOperationContext<>(ClientProtocol.Message::getGetRequest,
new GetRequestOperationHandler(),
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java
new file mode 100644
index 0000000..5b47c8f
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class DisconnectClientIntegrationTest {
+ public static final String SECURITY_PRINCIPAL = "principle";
+ private Socket socket;
+ private Cache cache;
+ private SecurityManager securityManager;
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+ @Before
+ public void setUp() throws Exception {
+ CacheFactory cacheFactory = new CacheFactory(new Properties());
+ cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+ cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
+ cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+
+ securityManager = mock(SecurityManager.class);
+ cacheFactory.setSecurityManager(securityManager);
+ when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL);
+ when(securityManager.authorize(eq(SECURITY_PRINCIPAL), any())).thenReturn(true);
+
+ cache = cacheFactory.create();
+
+ CacheServer cacheServer = cache.addCacheServer();
+ int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ cacheServer.setPort(cacheServerPort);
+ cacheServer.start();
+
+ RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+ regionFactory.setDataPolicy(DataPolicy.PARTITION);
+
+
+ System.setProperty("geode.feature-protobuf-protocol", "true");
+
+ socket = new Socket("localhost", cacheServerPort);
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+
+ MessageUtil.performAndVerifyHandshake(socket);
+ }
+
+ @After
+ public void tearDown() {
+ cache.close();
+ try {
+ socket.close();
+ } catch (IOException ignore) {
+ // NOP
+ }
+ }
+
+ @Test
+ public void disconnectsFromServer() throws Exception {
+ authenticateWithServer();
+
+ final ClientProtocol.Message requestMessage = createRequestMessageBuilder(
+ ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Normal termination")).build();
+
+ final ClientProtocol.Message responseMessage = writeMessage(requestMessage);
+ assertEquals(responseMessage.toString(),
+ ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTRESPONSE,
+ responseMessage.getMessageTypeCase());
+ final ConnectionAPI.DisconnectClientResponse disconnectClientResponse =
+ responseMessage.getDisconnectClientResponse();
+ assertNotNull(disconnectClientResponse);
+ }
+
+ private void authenticateWithServer() throws IOException {
+ ClientProtocol.Message.Builder request = ClientProtocol.Message.newBuilder()
+ .setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
+ .putCredentials(ResourceConstants.USER_NAME, "someuser")
+ .putCredentials(ResourceConstants.PASSWORD, "somepassword"));
+
+ ClientProtocol.Message response = writeMessage(request.build());
+ assertTrue(response.getAuthenticationResponse().getAuthenticated());
+ }
+
+ private ClientProtocol.Message.Builder createRequestMessageBuilder(
+ ConnectionAPI.DisconnectClientRequest.Builder disconnectClientRequest) {
+ return ClientProtocol.Message.newBuilder().setDisconnectClientRequest(disconnectClientRequest);
+ }
+
+ private ClientProtocol.Message writeMessage(ClientProtocol.Message request) throws IOException {
+ request.writeDelimitedTo(socket.getOutputStream());
+
+ return ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+ }
+}
--
To stop receiving notification emails like this one, please contact
pivotalsarge@apache.org.