You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/09/14 22:50:59 UTC

[GitHub] [geode] Bill commented on a change in pull request #6835: GEODE-9457: re-authentication in event dispatcher

Bill commented on a change in pull request #6835:
URL: https://github.com/apache/geode/pull/6835#discussion_r708663990



##########
File path: geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
##########
@@ -123,67 +119,89 @@ private AuthenticateUserOp() {
       super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
       securityProperties = securityProps;
       needsServerLocation = needsServer;
-
       getMessage().setMessageHasSecurePartFlag();
     }
 
     @Override
-    protected void sendMessage(Connection cnx) throws Exception {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT);
-      byte[] secureBytes;
-      hdos.writeLong(cnx.getConnectionID());
-      if (securityProperties != null) {
-        DistributedMember server = new InternalDistributedMember(cnx.getSocket().getInetAddress(),
-            cnx.getSocket().getPort(), false);
-        DistributedSystem sys = InternalDistributedSystem.getConnectedInstance();
-        String authInitMethod = sys.getProperties().getProperty(SECURITY_CLIENT_AUTH_INIT);
-
-        Properties credentials = Handshake.getCredentials(authInitMethod, securityProperties,
-            server, false, sys.getLogWriter(), sys.getSecurityLogWriter());
-        byte[] credentialBytes;
-        try (HeapDataOutputStream heapdos = new HeapDataOutputStream(KnownVersion.CURRENT)) {
-          DataSerializer.writeProperties(credentials, heapdos);
-          credentialBytes = ((ConnectionImpl) cnx).encryptBytes(heapdos.toByteArray());
-        }
-        getMessage().addBytesPart(credentialBytes);
+    protected void sendMessage(Connection connection) throws Exception {
+      if (securityProperties == null) {
+        securityProperties = getConnectedSystem().getSecurityProperties();
       }
-      try {
-        secureBytes = ((ConnectionImpl) cnx).encryptBytes(hdos.toByteArray());
-      } finally {
-        hdos.close();
+      byte[] credentialBytes = getCredentialBytes(connection, securityProperties);
+      getMessage().addBytesPart(credentialBytes);
+
+      try (HeapDataOutputStream hdos = new HeapDataOutputStream(16, KnownVersion.CURRENT)) {
+        hdos.writeLong(connection.getConnectionID());
+        hdos.writeLong(getUserId(connection));
+        getMessage().setSecurePart(((ConnectionImpl) connection).encryptBytes(hdos.toByteArray()));
       }
-      getMessage().setSecurePart(secureBytes);
       getMessage().send(false);
     }
 
+    protected long getUserId(Connection connection) {
+      // single user mode
+      if (UserAttributes.userAttributes.get() == null) {
+        return connection.getServer().getUserId();
+      }
+      // multi user mode
+      Long id = UserAttributes.userAttributes.get().getServerToId().get(connection.getServer());
+      if (id == null) {
+        return -1L;

Review comment:
       There is a lot of `-1L` and `-1` added in this PR. Have you considered adding a constant like `NOT_A_USER_ID` or something like that?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientReAuthenticateMessage.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.tier.MessageType;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.internal.serialization.SerializationContext;
+
+public class ClientReAuthenticateMessage implements ClientMessage {
+
+  /**
+   * This {@code ClientMessage}'s {@code EventID}
+   */
+  private final EventID eventId;
+
+  public ClientReAuthenticateMessage() {
+    this(new EventID());
+  }
+
+  public ClientReAuthenticateMessage(EventID eventId) {
+    this.eventId = eventId;
+  }
+
+  @Override
+  public Message getMessage(CacheClientProxy proxy, boolean notify) throws IOException {
+    Message message = new Message(1, KnownVersion.CURRENT);
+    message.setMessageType(MessageType.CLIENT_RE_AUTHENTICATE);
+    message.setTransactionId(0);
+    message.addObjPart(eventId);
+    return message;
+  }
+
+  @Override
+  public boolean shouldBeConflated() {
+    return true;
+  }
+
+  @Override
+  public void toData(DataOutput out,
+      SerializationContext context) throws IOException {
+    eventId.toData(out, context);
+  }
+
+  @Override
+  public int getDSFID() {
+    return CLIENT_RE_AUTHENTICATE;
+  }
+
+  @Override
+  public void fromData(DataInput in,
+      DeserializationContext context) throws IOException, ClassNotFoundException {
+    eventId.fromData(in, context);
+  }
+
+  @Override
+  public EventID getEventId() {
+    return eventId;
+  }
+
+  @Override
+  public String getRegionToConflate() {
+    return "gemfire_reserved_region_name_for_client_re_authenticate";
+  }
+
+  @Override
+  public Object getKeyToConflate() {
+    // This method can be called by HARegionQueue.
+    // Use this to identify the message type.
+    return "re_authenticate";
+  }
+
+  @Override
+  public Object getValueToConflate() {
+    // This method can be called by HARegionQueue
+    // Use this to identify the message type.
+    return "re_authenticate";
+  }
+
+  @Override
+  public void setLatestValue(Object value) {}

Review comment:
       should this throw `UnsupportedOperationException` the way `GatewaySenderEventImpl` does, instead of silently failing?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -124,41 +141,34 @@
    */
   protected MessageDispatcher(CacheClientProxy proxy, String name,
       StatisticsClock statisticsClock) throws CacheException {
-    super(name);
-
-    _proxy = proxy;
-
-    // Create the event conflator
-    // this._eventConflator = new BridgeEventConflator
+    this(proxy, name, getMessageQueue(proxy, statisticsClock));
+  }
 
-    // Create the message queue
+  private static HARegionQueue getMessageQueue(CacheClientProxy proxy,
+      StatisticsClock statisticsClock) {

Review comment:
       Nice cleanup of the constructor!

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -362,31 +387,67 @@ protected void runDispatcher() {
           }
           waitForResumption();
         }
-        try {
-          clientMessage = (ClientMessage) _messageQueue.peek();
-        } catch (RegionDestroyedException skipped) {
-          break;
+
+        // if message is not delivered due to authentication expiation, this clientMessage
+        // would not be null.
+        if (clientMessage == null) {
+          try {
+            clientMessage = (ClientMessage) _messageQueue.peek();
+          } catch (RegionDestroyedException skipped) {
+            break;
+          }
         }
+
         getStatistics().setQueueSize(_messageQueue.size());
         if (isStopped()) {
           break;
         }
-        if (clientMessage != null) {
-          // Process the message
-          long start = getStatistics().startTime();
-          //// BUGFIX for BUG#38206 and BUG#37791
-          boolean isDispatched = dispatchMessage(clientMessage);
-          getStatistics().endMessage(start);
-          if (isDispatched) {
+
+        if (clientMessage == null) {
+          _messageQueue.remove();
+          continue;
+        }
+
+        // Process the message
+        long start = getStatistics().startTime();
+        try {
+          if (dispatchMessage(clientMessage)) {
+            getStatistics().endMessage(start);
             _messageQueue.remove();
             if (clientMessage instanceof ClientMarkerMessageImpl) {
               getProxy().setMarkerEnqueued(false);
             }
           }
-        } else {
+          clientMessage = null;
+          wait_for_re_auth_start_time = -1;
+        } catch (NotAuthorizedException notAuthorized) {
+          // behave as if the message is dispatched, remove from the queue
+          logger.info("skip delivering message: " + clientMessage, notAuthorized);
           _messageQueue.remove();
+          clientMessage = null;
+        } catch (AuthenticationExpiredException expired) {
+          if (wait_for_re_auth_start_time == -1) {
+            wait_for_re_auth_start_time = System.currentTimeMillis();
+            // only send the message to clients who can handle the message
+            if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) {
+              EventID eventId = createEventId();
+              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+            }
+            // for older client, we still wait, just in case client will perform some operations to
+            // trigger credential refresh on its own.
+            Thread.sleep(200);
+          } else {
+            long elapsedTime = System.currentTimeMillis() - wait_for_re_auth_start_time;
+            if (elapsedTime > reAuthenticateWaitTime) {
+              logger.warn("Client did not re-authenticate back successfully in " + elapsedTime
+                  + "ms. Unregister this client proxy.");
+              pauseOrUnregisterProxy(expired);
+            } else {
+              Thread.sleep(200);

Review comment:
       I don't understand why the `sleep()` is needed here. Please explain.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -335,10 +356,14 @@ protected void runDispatcher() {
       logger.debug("{}: Beginning to process events", this);
     }
 
+    long reAuthenticateWaitTime =
+        getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, DEFAULT_RE_AUTHENTICATE_WAIT_TIME);
+
     ClientMessage clientMessage = null;
+    long wait_for_re_auth_start_time = -1;

Review comment:
       Please consider moving this variable declaration closer to where it's needed. It can moved inside the `while` body: down to just before the `try` block at line 413.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -362,31 +387,67 @@ protected void runDispatcher() {
           }
           waitForResumption();
         }
-        try {
-          clientMessage = (ClientMessage) _messageQueue.peek();
-        } catch (RegionDestroyedException skipped) {
-          break;
+
+        // if message is not delivered due to authentication expiation, this clientMessage
+        // would not be null.
+        if (clientMessage == null) {
+          try {
+            clientMessage = (ClientMessage) _messageQueue.peek();
+          } catch (RegionDestroyedException skipped) {
+            break;
+          }
         }
+
         getStatistics().setQueueSize(_messageQueue.size());
         if (isStopped()) {
           break;
         }
-        if (clientMessage != null) {
-          // Process the message
-          long start = getStatistics().startTime();
-          //// BUGFIX for BUG#38206 and BUG#37791
-          boolean isDispatched = dispatchMessage(clientMessage);
-          getStatistics().endMessage(start);
-          if (isDispatched) {
+
+        if (clientMessage == null) {
+          _messageQueue.remove();
+          continue;
+        }
+
+        // Process the message
+        long start = getStatistics().startTime();
+        try {
+          if (dispatchMessage(clientMessage)) {
+            getStatistics().endMessage(start);
             _messageQueue.remove();
             if (clientMessage instanceof ClientMarkerMessageImpl) {
               getProxy().setMarkerEnqueued(false);
             }
           }
-        } else {
+          clientMessage = null;
+          wait_for_re_auth_start_time = -1;
+        } catch (NotAuthorizedException notAuthorized) {
+          // behave as if the message is dispatched, remove from the queue
+          logger.info("skip delivering message: " + clientMessage, notAuthorized);
           _messageQueue.remove();
+          clientMessage = null;
+        } catch (AuthenticationExpiredException expired) {
+          if (wait_for_re_auth_start_time == -1) {
+            wait_for_re_auth_start_time = System.currentTimeMillis();
+            // only send the message to clients who can handle the message
+            if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) {
+              EventID eventId = createEventId();
+              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+            }
+            // for older client, we still wait, just in case client will perform some operations to
+            // trigger credential refresh on its own.

Review comment:
       The comment makes me think that this `Thread.sleep()` should be in an `else` clause so the sleep if clients don't understand the re-authenticate message.
   
   Consider putting this sleep into an `else` block.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
##########
@@ -1164,6 +1190,20 @@ public void removeUserAuth(Message message, boolean keepAlive) {
     }
   }
 
+  @VisibleForTesting
+  long putSubject(Subject subject, long existingUniqueId) {
+    long uniqueId;
+    uniqueId = clientUserAuths.putSubject(subject, existingUniqueId);

Review comment:
       Please combine the assignment on this line with the declaration on the previous one. 
   
   Incidentally, `uniqueId` can be `final`.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -581,6 +656,33 @@ protected boolean dispatchMessage(ClientMessage clientMessage) throws IOExceptio
         logger.trace(msg.toString());
       }
 
+      // authorize the message before dispatching
+      String regionName = clientUpdateMessage.getRegionName();
+      Object key = clientUpdateMessage.getKeyOfInterest();
+      SecurityService securityService = getCache().getSecurityService();
+      ResourcePermission permission = new ResourcePermission(ResourcePermission.Resource.DATA,
+          ResourcePermission.Operation.READ,
+          regionName, key == null ? null : key.toString());

Review comment:
       `permission` isn't needed outside the body of the next `if`. And even then it is only needed conditionally (sometimes it isn't needed at all). And `regionName` and `key` are used only to compute `permission`.
   
   Please consider replacing lines 660, 661, 663-665 w/ a method that takes `clientUpdateMessage` and returns `ResourcePermission` and then call that method in the two places `permission` is currently used below. This will eliminate some spurious object allocation and make this code a little simpler.

##########
File path: geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
##########
@@ -87,31 +101,13 @@ private AuthenticateUserOp() {
   }
 
   static class AuthenticateUserOpImpl extends AbstractOp {
-
+    private static final Logger logger = LogService.getLogger();

Review comment:
       please remove this unused variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org