You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/07 19:49:07 UTC

[geode] branch develop updated: GEODE-5521: lastResultReceived flag set (#2255)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 82ac395  GEODE-5521: lastResultReceived flag set (#2255)
82ac395 is described below

commit 82ac395f6c88adfc6e2f2c71ef396b154c9e08e6
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Tue Aug 7 12:49:02 2018 -0700

    GEODE-5521: lastResultReceived flag set (#2255)
    
            * When an exception is received lastResultReceived flag is set.
    	* Previously this flag is only set if exception is of type QueryInvocationTargetException or FunctionInvocationTargetException
    	* Setting this flag will prevent other threads from sending results to the client after the exception is sent.
---
 .../ServerToClientFunctionResultSender.java        | 23 +++++-
 .../ServerToClientFunctionResultSender65.java      |  9 +++
 .../internal/cache/tier/sockets/BaseCommand.java   | 16 ++++
 .../sockets/command/ExecuteRegionFunction61.java   | 22 ++---
 .../sockets/command/ExecuteRegionFunction65.java   | 22 ++---
 .../sockets/command/ExecuteRegionFunction66.java   | 23 +++---
 ...verToClientFunctionResultSender65JUnitTest.java | 26 ++++++
 ...erverToClientFunctionResultSenderJUnitTest.java | 63 +++++++++++++++
 .../cache/tier/sockets/BaseCommandJUnitTest.java   | 94 ++++++++++++++++++++++
 9 files changed, 268 insertions(+), 30 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
index a50a6d7..2e760e7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
@@ -55,6 +55,10 @@ public class ServerToClientFunctionResultSender implements ResultSender {
 
   protected AtomicBoolean alreadySendException = new AtomicBoolean(false);
 
+  public synchronized void setLastResultReceived(boolean lastResultReceived) {
+    this.lastResultReceived = lastResultReceived;
+  }
+
   protected boolean lastResultReceived;
 
   protected ByteBuffer commBuffer;
@@ -81,6 +85,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
   }
 
   public synchronized void lastResult(Object oneResult) {
+    if (lastResultReceived) {
+      return;
+    }
     this.lastResultReceived = true;
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
@@ -90,9 +97,7 @@ public class ServerToClientFunctionResultSender implements ResultSender {
       }
       return;
     }
-    if (this.lastResultReceived) {
-      return;
-    }
+
     if (logger.isDebugEnabled()) {
       logger.debug("ServerToClientFunctionResultSender sending last result1 {} " + oneResult);
     }
@@ -129,6 +134,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
   }
 
   public synchronized void lastResult(Object oneResult, DistributedMember memberID) {
+    if (lastResultReceived) {
+      return;
+    }
     this.lastResultReceived = true;
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
@@ -173,6 +181,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
   }
 
   public synchronized void sendResult(Object oneResult) {
+    if (lastResultReceived) {
+      return;
+    }
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -213,6 +224,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
   }
 
   public synchronized void sendResult(Object oneResult, DistributedMember memberID) {
+    if (lastResultReceived) {
+      return;
+    }
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -294,6 +308,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
   }
 
   public synchronized void setException(Throwable exception) {
+    if (lastResultReceived) {
+      return;
+    }
     this.lastResultReceived = true;
     if (logger.isDebugEnabled()) {
       logger.debug("ServerToClientFunctionResultSender setting exception {} ", exception);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
index eb3f0ca..b9ba438 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
@@ -110,6 +110,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
 
   @Override
   public synchronized void lastResult(Object oneResult, DistributedMember memberID) {
+    if (lastResultReceived) {
+      return;
+    }
     this.lastResultReceived = true;
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
@@ -171,6 +174,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
 
   @Override
   public synchronized void sendResult(Object oneResult) {
+    if (lastResultReceived) {
+      return;
+    }
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -214,6 +220,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
 
   @Override
   public synchronized void sendResult(Object oneResult, DistributedMember memberID) {
+    if (lastResultReceived) {
+      return;
+    }
     if (!isOkayToSendResult()) {
       if (logger.isDebugEnabled()) {
         logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 82cd631..8d166e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -68,6 +68,7 @@ import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.VersionTagHolder;
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.InterestType;
@@ -137,6 +138,21 @@ public abstract class BaseCommand implements Command {
     return OK_BYTES;
   }
 
+  protected boolean setLastResultReceived(
+      ServerToClientFunctionResultSender resultSender) {
+
+    if (resultSender != null) {
+      synchronized (resultSender) {
+        if (resultSender.isLastResultReceived()) {
+          return false;
+        } else {
+          resultSender.setLastResultReceived(true);
+        }
+      }
+    }
+    return true;
+  }
+
   @Override
   public void execute(Message clientMessage, ServerConnection serverConnection,
       SecurityService securityService) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
index 51ab798..023c41c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
@@ -256,18 +256,22 @@ public class ExecuteRegionFunction61 extends BaseCommand {
           }
           resultSender.setException(fe);
         } else {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-              function), fe);
-          sendException(hasResult, clientMessage, message, serverConnection, fe);
+          if (setLastResultReceived(resultSender)) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+                function), fe);
+            sendException(hasResult, clientMessage, message, serverConnection, fe);
+          }
         }
 
       } catch (Exception e) {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-            function), e);
-        String message = e.getMessage();
-        sendException(hasResult, clientMessage, message, serverConnection, e);
+        if (setLastResultReceived(resultSender)) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+              function), e);
+          String message = e.getMessage();
+          sendException(hasResult, clientMessage, message, serverConnection, e);
+        }
       }
 
       finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
index d2ee02c..f08e4fe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
@@ -298,18 +298,22 @@ public class ExecuteRegionFunction65 extends BaseCommand {
 
         resultSender.setException(fe);
       } else {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-            function), fe);
-        sendException(hasResult, clientMessage, message, serverConnection, fe);
+        if (setLastResultReceived(resultSender)) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+              function), fe);
+          sendException(hasResult, clientMessage, message, serverConnection, fe);
+        }
       }
 
     } catch (Exception e) {
-      logger.warn(LocalizedMessage.create(
-          LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-          function), e);
-      String message = e.getMessage();
-      sendException(hasResult, clientMessage, message, serverConnection, e);
+      if (setLastResultReceived(resultSender)) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+            function), e);
+        String message = e.getMessage();
+        sendException(hasResult, clientMessage, message, serverConnection, e);
+      }
     } finally {
       handshake.setClientReadTimeout(earlierClientReadTimeout);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
index 042e998..c693771 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
@@ -321,24 +321,29 @@ public class ExecuteRegionFunction66 extends BaseCommand {
 
         resultSender.setException(fe);
       } else {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-            function), fe);
-        sendException(hasResult, clientMessage, message, serverConnection, fe);
+        if (setLastResultReceived(resultSender)) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+              function), fe);
+          sendException(hasResult, clientMessage, message, serverConnection, fe);
+        }
       }
 
     } catch (Exception e) {
-      logger.warn(LocalizedMessage.create(
-          LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
-          function), e);
-      String message = e.getMessage();
-      sendException(hasResult, clientMessage, message, serverConnection, e);
+      if (setLastResultReceived(resultSender)) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+            function), e);
+        String message = e.getMessage();
+        sendException(hasResult, clientMessage, message, serverConnection, e);
+      }
     } finally {
       handshake.setClientReadTimeout(earlierClientReadTimeout);
       ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0);
     }
   }
 
+
   private void sendException(byte hasResult, Message msg, String message,
       ServerConnection serverConnection, Throwable e) throws IOException {
     synchronized (msg) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java
new file mode 100644
index 0000000..90d94f8
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.mockito.Mockito.mock;
+
+public class ServerToClientFunctionResultSender65JUnitTest
+    extends ServerToClientFunctionResultSenderJUnitTest {
+
+  @Override
+  protected ServerToClientFunctionResultSender getResultSender() {
+    return mock(ServerToClientFunctionResultSender65.class);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java
new file mode 100644
index 0000000..89ee3ff
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedMember;
+
+public class ServerToClientFunctionResultSenderJUnitTest {
+
+  protected ServerToClientFunctionResultSender getResultSender() {
+    return mock(ServerToClientFunctionResultSender.class);
+  }
+
+  @Test
+  public void whenLastResultReceivedIsSetThenLastResultMustReturnImmediately() {
+    ServerToClientFunctionResultSender resultSender = getResultSender();
+    resultSender.lastResultReceived = true;
+
+    Object object = mock(Object.class);
+    DistributedMember memberId = mock(DistributedMember.class);
+    resultSender.lastResult(object, memberId);
+    verify(resultSender, times(0)).isOkayToSendResult();
+
+
+    resultSender.lastResult(object);
+    verify(resultSender, times(0)).isOkayToSendResult();
+
+  }
+
+  @Test
+  public void whenLastResultReceivedIsSetThenSendResultMustReturnImmediately() {
+    ServerToClientFunctionResultSender resultSender = getResultSender();
+    resultSender.lastResultReceived = true;
+
+    Object object = mock(Object.class);
+    DistributedMember memberId = mock(DistributedMember.class);
+    resultSender.sendResult(object, memberId);
+    verify(resultSender, times(0)).isOkayToSendResult();
+
+
+    resultSender.sendResult(object);
+    verify(resultSender, times(0)).isOkayToSendResult();
+
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java
new file mode 100644
index 0000000..d8ff5c0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction61;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction65;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction66;
+
+@RunWith(JUnitParamsRunner.class)
+public class BaseCommandJUnitTest {
+
+  public BaseCommand[] getCommands() {
+    return new BaseCommand[] {(BaseCommand) ExecuteRegionFunction61.getCommand(),
+        (BaseCommand) ExecuteRegionFunction65
+            .getCommand(),
+        (BaseCommand) ExecuteRegionFunction66.getCommand()};
+  }
+
+  @Test
+  @Parameters(method = "getCommands")
+  public void whenLastReceivedIsSetThenCheckAndSetLastResultSentIfValidMustReturnTrue(
+      BaseCommand baseCommand) {
+
+    ServerToClientFunctionResultSender resultSender =
+        mock(ServerToClientFunctionResultSender.class);
+    when(resultSender.isLastResultReceived()).thenReturn(true);
+    assertFalse(baseCommand.setLastResultReceived(resultSender));
+
+  }
+
+  @Test
+  @Parameters(method = "getCommands")
+  public void whenLastReceivedIsNotSetThenCheckAndSetLastResultSentIfValidMustReturnFalse(
+      BaseCommand baseCommand) {
+
+    ServerToClientFunctionResultSender resultSender =
+        mock(ServerToClientFunctionResultSender.class);
+    when(resultSender.isLastResultReceived()).thenReturn(false);
+    assertTrue(baseCommand.setLastResultReceived(resultSender));
+
+  }
+
+  @Test
+  @Parameters(method = "getCommands")
+  public void whenLastReceivedIsNotSetThenCheckAndSetLastResultSentIfValidMustSetIt(
+      BaseCommand baseCommand) {
+
+    ServerToClientFunctionResultSender resultSender =
+        mock(ServerToClientFunctionResultSender.class);
+    when(resultSender.isLastResultReceived()).thenReturn(false);
+    baseCommand.setLastResultReceived(resultSender);
+    verify(resultSender, times(1)).setLastResultReceived(true);
+
+  }
+
+  @Test
+  @Parameters(method = "getCommands")
+  public void whenLastReceivedIsSetThenCheckAndSetLastResultSentIfValidMustNotSetIt(
+      BaseCommand baseCommand) {
+
+    ServerToClientFunctionResultSender resultSender =
+        mock(ServerToClientFunctionResultSender.class);
+    when(resultSender.isLastResultReceived()).thenReturn(true);
+    baseCommand.setLastResultReceived(resultSender);
+    verify(resultSender, times(0)).setLastResultReceived(true);
+
+  }
+
+}