You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2020/08/08 00:08:44 UTC
[geode] branch support/1.13 updated: GEODE-8394: Rewind the message
Part on command failure (#5424)
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 0fe6518 GEODE-8394: Rewind the message Part on command failure (#5424)
0fe6518 is described below
commit 0fe6518851d7bdd85d544b8f3c0647ab9053c891
Author: agingade <ag...@pivotal.io>
AuthorDate: Fri Aug 7 10:12:27 2020 -0700
GEODE-8394: Rewind the message Part on command failure (#5424)
GEODE-8394: Rewind the message Part on failure
Co-authored-by: anilkumar gingade <an...@anilg.local>
(cherry picked from commit 83d1e28a953b7d73e7f499f9013540bedd0bd472)
---
.../ClientServerCacheOperationDUnitTest.java | 204 +++++++++++++++++++++
.../geode/internal/cache/tier/sockets/Part.java | 21 ++-
.../internal/cache/tier/sockets/PartTest.java | 105 +++++++++++
3 files changed, 324 insertions(+), 6 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java
new file mode 100644
index 0000000..4bfa3cc
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class ClientServerCacheOperationDUnitTest implements Serializable {
+
+ private String regionName = "CsTestRegion";
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Test
+ public void largeObjectPutWithReadTimeoutThrowsException() {
+ VM server1 = VM.getVM(0);
+ VM server2 = VM.getVM(1);
+ VM client = VM.getVM(2);
+
+ final int byteSize = 40 * 1000 * 1000;
+ final int listSize = 2;
+ final int locatorPort = DistributedTestUtils.getLocatorPort();
+
+ server1.invoke(() -> createServerCache());
+ server2.invoke(() -> createServerCache());
+
+ server1.invoke(() -> {
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ regionFactory.create(regionName);
+ });
+
+ server2.invoke(() -> {
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ regionFactory.create(regionName);
+ });
+
+ List<byte[]> list = new ArrayList(listSize);
+
+ for (int i = 0; i < listSize; i++) {
+ list.add(new byte[byteSize]);
+ }
+
+ client.invoke(() -> {
+ clientCacheRule.createClientCache();
+
+ Pool pool = PoolManager.createFactory()
+ .addLocator("localhost", locatorPort)
+ .setSocketBufferSize(50)
+ .setReadTimeout(40)
+ .setPingInterval(200)
+ .setSocketConnectTimeout(50)
+ .setServerConnectionTimeout(50)
+ .create("testPool");
+
+ Region region = clientCacheRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .setPoolName(pool.getName())
+ .create(regionName);
+
+ assertThatThrownBy(() -> region.put("key", list))
+ .isInstanceOf(ServerConnectivityException.class);
+
+ });
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ List value = (List) region.get("key");
+ if (value != null) {
+ assertThat(value.size()).isEqualTo(listSize);
+ list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize));
+ }
+ });
+
+ client.invoke(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ assertThat(region.size()).isEqualTo(0);
+ List value = (List) region.get("key");
+ if (value != null) {
+ assertThat(value.size()).isEqualTo(listSize);
+ list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize));
+ }
+ });
+
+ }
+
+ @Test
+ public void largeObjectGetWithReadTimeout() {
+ VM server1 = VM.getVM(0);
+ VM server2 = VM.getVM(1);
+ VM server3 = VM.getVM(2);
+ VM client = VM.getVM(3);
+
+ final int locatorPort = DistributedTestUtils.getLocatorPort();
+
+ server1.invoke(() -> createServerCache());
+ server2.invoke(() -> createServerCache());
+ server3.invoke(() -> createServerCache());
+
+ server1.invoke(() -> {
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ regionFactory.create(regionName);
+ });
+
+ server2.invoke(() -> {
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ regionFactory.create(regionName);
+ });
+
+ server3.invoke(() -> {
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ Region region = regionFactory.create(regionName);
+
+ int listSize = 2;
+ List list = new ArrayList(listSize);
+
+ for (int i = 0; i < listSize; i++) {
+ list.add(new byte[75 * 1000 * 1000]);
+ }
+
+ region.put("key", list);
+ });
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+
+ assertThat(region.size()).isEqualTo(1);
+ });
+
+ client.invoke(() -> {
+ clientCacheRule.createClientCache();
+
+ Pool pool = PoolManager.createFactory()
+ .addLocator("localhost", locatorPort)
+ .setSocketBufferSize(100)
+ .setReadTimeout(50)
+ .create("testPool");
+
+ Region region = clientCacheRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .setPoolName(pool.getName())
+ .create(regionName);
+
+ region.get("key");
+ assertThat(region.size()).isEqualTo(0);
+
+ Object value = region.get("key");
+
+ assertThat(value).isInstanceOf(List.class);
+ });
+
+ }
+
+ private void createServerCache() throws IOException {
+ cacheRule.createCache();
+ CacheServer cacheServer = cacheRule.getCache().addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
index f833f05..9654cee 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
@@ -401,8 +401,11 @@ public class Part {
}
} else {
HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
- hdos.sendTo(out, buf);
- hdos.rewind();
+ try {
+ hdos.sendTo(out, buf);
+ } finally {
+ hdos.rewind();
+ }
}
}
}
@@ -431,8 +434,11 @@ public class Part {
}
} else {
HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
- hdos.sendTo(buf);
- hdos.rewind();
+ try {
+ hdos.sendTo(buf);
+ } finally {
+ hdos.rewind();
+ }
}
}
}
@@ -497,8 +503,11 @@ public class Part {
}
} else {
HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
- hdos.sendTo(sc, buf);
- hdos.rewind();
+ try {
+ hdos.sendTo(sc, buf);
+ } finally {
+ hdos.rewind();
+ }
}
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
index d744218..2d882b2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
@@ -16,16 +16,24 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.io.EOFException;
import java.io.OutputStream;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category({ClientServerTest.class})
@@ -76,4 +84,101 @@ public class PartTest {
assertThatThrownBy(() -> part.getCachedString())
.hasMessageContaining("expected String part to be of type BYTE, part =");
}
+
+ @Test
+ public void writeToOutputStreamResetsPartOnException() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ OutputStream outputStream = mock(OutputStream.class);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+ doThrow(new EOFException("test")).when(heapDataOutputStream).sendTo(eq(outputStream),
+ eq(byteBuffer));
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ Throwable thrown = catchThrowable(() -> part.writeTo(outputStream, byteBuffer));
+
+ assertThat(thrown).isInstanceOf(EOFException.class);
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
+ @Test
+ public void writeToOutputStreamResetsPartOnSuccess() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ OutputStream outputStream = mock(OutputStream.class);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ part.writeTo(outputStream, byteBuffer);
+
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
+ @Test
+ public void writeToByteBufferResetsPartOnException() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+ doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(byteBuffer));
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ Throwable thrown = catchThrowable(() -> part.writeTo(byteBuffer));
+
+ assertThat(thrown).isInstanceOf(BufferOverflowException.class);
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
+ @Test
+ public void writeToByteBufferResetsPartOnSuccess() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ part.writeTo(byteBuffer);
+
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
+ @Test
+ public void writeToSocketChannelResetsPartOnException() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ SocketChannel socketChannel = mock(SocketChannel.class);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+ doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(socketChannel),
+ eq(byteBuffer));
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ Throwable thrown = catchThrowable(() -> part.writeTo(socketChannel, byteBuffer));
+
+ assertThat(thrown).isInstanceOf(BufferOverflowException.class);
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
+ @Test
+ public void writeToSocketChannelResetsPartOnSuccess() throws Exception {
+ HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+ when(heapDataOutputStream.size()).thenReturn(1000);
+ SocketChannel socketChannel = mock(SocketChannel.class);
+ ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+ Part part = new Part();
+ part.setPartState(heapDataOutputStream, false);
+
+ part.writeTo(socketChannel, byteBuffer);
+
+ verify(heapDataOutputStream, times(1)).rewind();
+ }
+
}