You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/07 19:49:07 UTC
[geode] branch develop updated: GEODE-5521: lastResultReceived flag
set (#2255)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 82ac395 GEODE-5521: lastResultReceived flag set (#2255)
82ac395 is described below
commit 82ac395f6c88adfc6e2f2c71ef396b154c9e08e6
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Tue Aug 7 12:49:02 2018 -0700
GEODE-5521: lastResultReceived flag set (#2255)
* When an exception is received lastResultReceived flag is set.
* Previously this flag is only set if exception is of type QueryInvocationTargetException or FunctionInvocationTargetException
* Setting this flag will prevent other threads from sending results to the client after the exception is sent.
---
.../ServerToClientFunctionResultSender.java | 23 +++++-
.../ServerToClientFunctionResultSender65.java | 9 +++
.../internal/cache/tier/sockets/BaseCommand.java | 16 ++++
.../sockets/command/ExecuteRegionFunction61.java | 22 ++---
.../sockets/command/ExecuteRegionFunction65.java | 22 ++---
.../sockets/command/ExecuteRegionFunction66.java | 23 +++---
...verToClientFunctionResultSender65JUnitTest.java | 26 ++++++
...erverToClientFunctionResultSenderJUnitTest.java | 63 +++++++++++++++
.../cache/tier/sockets/BaseCommandJUnitTest.java | 94 ++++++++++++++++++++++
9 files changed, 268 insertions(+), 30 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
index a50a6d7..2e760e7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender.java
@@ -55,6 +55,10 @@ public class ServerToClientFunctionResultSender implements ResultSender {
protected AtomicBoolean alreadySendException = new AtomicBoolean(false);
+ public synchronized void setLastResultReceived(boolean lastResultReceived) {
+ this.lastResultReceived = lastResultReceived;
+ }
+
protected boolean lastResultReceived;
protected ByteBuffer commBuffer;
@@ -81,6 +85,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
public synchronized void lastResult(Object oneResult) {
+ if (lastResultReceived) {
+ return;
+ }
this.lastResultReceived = true;
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
@@ -90,9 +97,7 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
return;
}
- if (this.lastResultReceived) {
- return;
- }
+
if (logger.isDebugEnabled()) {
logger.debug("ServerToClientFunctionResultSender sending last result1 {} " + oneResult);
}
@@ -129,6 +134,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
public synchronized void lastResult(Object oneResult, DistributedMember memberID) {
+ if (lastResultReceived) {
+ return;
+ }
this.lastResultReceived = true;
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
@@ -173,6 +181,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
public synchronized void sendResult(Object oneResult) {
+ if (lastResultReceived) {
+ return;
+ }
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -213,6 +224,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
public synchronized void sendResult(Object oneResult, DistributedMember memberID) {
+ if (lastResultReceived) {
+ return;
+ }
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -294,6 +308,9 @@ public class ServerToClientFunctionResultSender implements ResultSender {
}
public synchronized void setException(Throwable exception) {
+ if (lastResultReceived) {
+ return;
+ }
this.lastResultReceived = true;
if (logger.isDebugEnabled()) {
logger.debug("ServerToClientFunctionResultSender setting exception {} ", exception);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
index eb3f0ca..b9ba438 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65.java
@@ -110,6 +110,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
@Override
public synchronized void lastResult(Object oneResult, DistributedMember memberID) {
+ if (lastResultReceived) {
+ return;
+ }
this.lastResultReceived = true;
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
@@ -171,6 +174,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
@Override
public synchronized void sendResult(Object oneResult) {
+ if (lastResultReceived) {
+ return;
+ }
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -214,6 +220,9 @@ public class ServerToClientFunctionResultSender65 extends ServerToClientFunction
@Override
public synchronized void sendResult(Object oneResult, DistributedMember memberID) {
+ if (lastResultReceived) {
+ return;
+ }
if (!isOkayToSendResult()) {
if (logger.isDebugEnabled()) {
logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 82cd631..8d166e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -68,6 +68,7 @@ import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.VersionTagHolder;
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.InterestType;
@@ -137,6 +138,21 @@ public abstract class BaseCommand implements Command {
return OK_BYTES;
}
+ protected boolean setLastResultReceived(
+ ServerToClientFunctionResultSender resultSender) {
+
+ if (resultSender != null) {
+ synchronized (resultSender) {
+ if (resultSender.isLastResultReceived()) {
+ return false;
+ } else {
+ resultSender.setLastResultReceived(true);
+ }
+ }
+ }
+ return true;
+ }
+
@Override
public void execute(Message clientMessage, ServerConnection serverConnection,
SecurityService securityService) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
index 51ab798..023c41c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java
@@ -256,18 +256,22 @@ public class ExecuteRegionFunction61 extends BaseCommand {
}
resultSender.setException(fe);
} else {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), fe);
- sendException(hasResult, clientMessage, message, serverConnection, fe);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), fe);
+ sendException(hasResult, clientMessage, message, serverConnection, fe);
+ }
}
} catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), e);
- String message = e.getMessage();
- sendException(hasResult, clientMessage, message, serverConnection, e);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), e);
+ String message = e.getMessage();
+ sendException(hasResult, clientMessage, message, serverConnection, e);
+ }
}
finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
index d2ee02c..f08e4fe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java
@@ -298,18 +298,22 @@ public class ExecuteRegionFunction65 extends BaseCommand {
resultSender.setException(fe);
} else {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), fe);
- sendException(hasResult, clientMessage, message, serverConnection, fe);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), fe);
+ sendException(hasResult, clientMessage, message, serverConnection, fe);
+ }
}
} catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), e);
- String message = e.getMessage();
- sendException(hasResult, clientMessage, message, serverConnection, e);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), e);
+ String message = e.getMessage();
+ sendException(hasResult, clientMessage, message, serverConnection, e);
+ }
} finally {
handshake.setClientReadTimeout(earlierClientReadTimeout);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
index 042e998..c693771 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
@@ -321,24 +321,29 @@ public class ExecuteRegionFunction66 extends BaseCommand {
resultSender.setException(fe);
} else {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), fe);
- sendException(hasResult, clientMessage, message, serverConnection, fe);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), fe);
+ sendException(hasResult, clientMessage, message, serverConnection, fe);
+ }
}
} catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
- function), e);
- String message = e.getMessage();
- sendException(hasResult, clientMessage, message, serverConnection, e);
+ if (setLastResultReceived(resultSender)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
+ function), e);
+ String message = e.getMessage();
+ sendException(hasResult, clientMessage, message, serverConnection, e);
+ }
} finally {
handshake.setClientReadTimeout(earlierClientReadTimeout);
ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0);
}
}
+
private void sendException(byte hasResult, Message msg, String message,
ServerConnection serverConnection, Throwable e) throws IOException {
synchronized (msg) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java
new file mode 100644
index 0000000..90d94f8
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSender65JUnitTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.mockito.Mockito.mock;
+
+public class ServerToClientFunctionResultSender65JUnitTest
+ extends ServerToClientFunctionResultSenderJUnitTest {
+
+ @Override
+ protected ServerToClientFunctionResultSender getResultSender() {
+ return mock(ServerToClientFunctionResultSender65.class);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java
new file mode 100644
index 0000000..89ee3ff
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/ServerToClientFunctionResultSenderJUnitTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedMember;
+
+public class ServerToClientFunctionResultSenderJUnitTest {
+
+ protected ServerToClientFunctionResultSender getResultSender() {
+ return mock(ServerToClientFunctionResultSender.class);
+ }
+
+ @Test
+ public void whenLastResultReceivedIsSetThenLastResultMustReturnImmediately() {
+ ServerToClientFunctionResultSender resultSender = getResultSender();
+ resultSender.lastResultReceived = true;
+
+ Object object = mock(Object.class);
+ DistributedMember memberId = mock(DistributedMember.class);
+ resultSender.lastResult(object, memberId);
+ verify(resultSender, times(0)).isOkayToSendResult();
+
+
+ resultSender.lastResult(object);
+ verify(resultSender, times(0)).isOkayToSendResult();
+
+ }
+
+ @Test
+ public void whenLastResultReceivedIsSetThenSendResultMustReturnImmediately() {
+ ServerToClientFunctionResultSender resultSender = getResultSender();
+ resultSender.lastResultReceived = true;
+
+ Object object = mock(Object.class);
+ DistributedMember memberId = mock(DistributedMember.class);
+ resultSender.sendResult(object, memberId);
+ verify(resultSender, times(0)).isOkayToSendResult();
+
+
+ resultSender.sendResult(object);
+ verify(resultSender, times(0)).isOkayToSendResult();
+
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java
new file mode 100644
index 0000000..d8ff5c0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandJUnitTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction61;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction65;
+import org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction66;
+
+@RunWith(JUnitParamsRunner.class)
+public class BaseCommandJUnitTest {
+
+ public BaseCommand[] getCommands() {
+ return new BaseCommand[] {(BaseCommand) ExecuteRegionFunction61.getCommand(),
+ (BaseCommand) ExecuteRegionFunction65
+ .getCommand(),
+ (BaseCommand) ExecuteRegionFunction66.getCommand()};
+ }
+
+ @Test
+ @Parameters(method = "getCommands")
+ public void whenLastReceivedIsSetThenCheckAndSetLastResultSentIfValidMustReturnTrue(
+ BaseCommand baseCommand) {
+
+ ServerToClientFunctionResultSender resultSender =
+ mock(ServerToClientFunctionResultSender.class);
+ when(resultSender.isLastResultReceived()).thenReturn(true);
+ assertFalse(baseCommand.setLastResultReceived(resultSender));
+
+ }
+
+ @Test
+ @Parameters(method = "getCommands")
+ public void whenLastReceivedIsNotSetThenCheckAndSetLastResultSentIfValidMustReturnFalse(
+ BaseCommand baseCommand) {
+
+ ServerToClientFunctionResultSender resultSender =
+ mock(ServerToClientFunctionResultSender.class);
+ when(resultSender.isLastResultReceived()).thenReturn(false);
+ assertTrue(baseCommand.setLastResultReceived(resultSender));
+
+ }
+
+ @Test
+ @Parameters(method = "getCommands")
+ public void whenLastReceivedIsNotSetThenCheckAndSetLastResultSentIfValidMustSetIt(
+ BaseCommand baseCommand) {
+
+ ServerToClientFunctionResultSender resultSender =
+ mock(ServerToClientFunctionResultSender.class);
+ when(resultSender.isLastResultReceived()).thenReturn(false);
+ baseCommand.setLastResultReceived(resultSender);
+ verify(resultSender, times(1)).setLastResultReceived(true);
+
+ }
+
+ @Test
+ @Parameters(method = "getCommands")
+ public void whenLastReceivedIsSetThenCheckAndSetLastResultSentIfValidMustNotSetIt(
+ BaseCommand baseCommand) {
+
+ ServerToClientFunctionResultSender resultSender =
+ mock(ServerToClientFunctionResultSender.class);
+ when(resultSender.isLastResultReceived()).thenReturn(true);
+ baseCommand.setLastResultReceived(resultSender);
+ verify(resultSender, times(0)).setLastResultReceived(true);
+
+ }
+
+}