You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2020/08/08 00:08:44 UTC

[geode] branch support/1.13 updated: GEODE-8394: Rewind the message Part on command failure (#5424)

This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 0fe6518  GEODE-8394: Rewind the message Part on command failure (#5424)
0fe6518 is described below

commit 0fe6518851d7bdd85d544b8f3c0647ab9053c891
Author: agingade <ag...@pivotal.io>
AuthorDate: Fri Aug 7 10:12:27 2020 -0700

    GEODE-8394: Rewind the message Part on command failure (#5424)
    
    GEODE-8394: Rewind the message Part on failure
    
    Co-authored-by: anilkumar gingade <an...@anilg.local>
    (cherry picked from commit 83d1e28a953b7d73e7f499f9013540bedd0bd472)
---
 .../ClientServerCacheOperationDUnitTest.java       | 204 +++++++++++++++++++++
 .../geode/internal/cache/tier/sockets/Part.java    |  21 ++-
 .../internal/cache/tier/sockets/PartTest.java      | 105 +++++++++++
 3 files changed, 324 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java
new file mode 100644
index 0000000..4bfa3cc
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class ClientServerCacheOperationDUnitTest implements Serializable {
+
+  private String regionName = "CsTestRegion";
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Test
+  public void largeObjectPutWithReadTimeoutThrowsException() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
+
+    final int byteSize = 40 * 1000 * 1000;
+    final int listSize = 2;
+    final int locatorPort = DistributedTestUtils.getLocatorPort();
+
+    server1.invoke(() -> createServerCache());
+    server2.invoke(() -> createServerCache());
+
+    server1.invoke(() -> {
+      RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+      regionFactory.create(regionName);
+    });
+
+    server2.invoke(() -> {
+      RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+      regionFactory.create(regionName);
+    });
+
+    List<byte[]> list = new ArrayList(listSize);
+
+    for (int i = 0; i < listSize; i++) {
+      list.add(new byte[byteSize]);
+    }
+
+    client.invoke(() -> {
+      clientCacheRule.createClientCache();
+
+      Pool pool = PoolManager.createFactory()
+          .addLocator("localhost", locatorPort)
+          .setSocketBufferSize(50)
+          .setReadTimeout(40)
+          .setPingInterval(200)
+          .setSocketConnectTimeout(50)
+          .setServerConnectionTimeout(50)
+          .create("testPool");
+
+      Region region = clientCacheRule.getClientCache()
+          .createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .setPoolName(pool.getName())
+          .create(regionName);
+
+      assertThatThrownBy(() -> region.put("key", list))
+          .isInstanceOf(ServerConnectivityException.class);
+
+    });
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      List value = (List) region.get("key");
+      if (value != null) {
+        assertThat(value.size()).isEqualTo(listSize);
+        list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize));
+      }
+    });
+
+    client.invoke(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      assertThat(region.size()).isEqualTo(0);
+      List value = (List) region.get("key");
+      if (value != null) {
+        assertThat(value.size()).isEqualTo(listSize);
+        list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize));
+      }
+    });
+
+  }
+
+  @Test
+  public void largeObjectGetWithReadTimeout() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM server3 = VM.getVM(2);
+    VM client = VM.getVM(3);
+
+    final int locatorPort = DistributedTestUtils.getLocatorPort();
+
+    server1.invoke(() -> createServerCache());
+    server2.invoke(() -> createServerCache());
+    server3.invoke(() -> createServerCache());
+
+    server1.invoke(() -> {
+      RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+      regionFactory.create(regionName);
+    });
+
+    server2.invoke(() -> {
+      RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+      regionFactory.create(regionName);
+    });
+
+    server3.invoke(() -> {
+      RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+      Region region = regionFactory.create(regionName);
+
+      int listSize = 2;
+      List list = new ArrayList(listSize);
+
+      for (int i = 0; i < listSize; i++) {
+        list.add(new byte[75 * 1000 * 1000]);
+      }
+
+      region.put("key", list);
+    });
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+
+      assertThat(region.size()).isEqualTo(1);
+    });
+
+    client.invoke(() -> {
+      clientCacheRule.createClientCache();
+
+      Pool pool = PoolManager.createFactory()
+          .addLocator("localhost", locatorPort)
+          .setSocketBufferSize(100)
+          .setReadTimeout(50)
+          .create("testPool");
+
+      Region region = clientCacheRule.getClientCache()
+          .createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .setPoolName(pool.getName())
+          .create(regionName);
+
+      region.get("key");
+      assertThat(region.size()).isEqualTo(0);
+
+      Object value = region.get("key");
+
+      assertThat(value).isInstanceOf(List.class);
+    });
+
+  }
+
+  private void createServerCache() throws IOException {
+    cacheRule.createCache();
+    CacheServer cacheServer = cacheRule.getCache().addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
index f833f05..9654cee 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
@@ -401,8 +401,11 @@ public class Part {
         }
       } else {
         HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
-        hdos.sendTo(out, buf);
-        hdos.rewind();
+        try {
+          hdos.sendTo(out, buf);
+        } finally {
+          hdos.rewind();
+        }
       }
     }
   }
@@ -431,8 +434,11 @@ public class Part {
         }
       } else {
         HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
-        hdos.sendTo(buf);
-        hdos.rewind();
+        try {
+          hdos.sendTo(buf);
+        } finally {
+          hdos.rewind();
+        }
       }
     }
   }
@@ -497,8 +503,11 @@ public class Part {
         }
       } else {
         HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
-        hdos.sendTo(sc, buf);
-        hdos.rewind();
+        try {
+          hdos.sendTo(sc, buf);
+        } finally {
+          hdos.rewind();
+        }
       }
     }
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
index d744218..2d882b2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java
@@ -16,16 +16,24 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.io.EOFException;
 import java.io.OutputStream;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 
 @Category({ClientServerTest.class})
@@ -76,4 +84,101 @@ public class PartTest {
     assertThatThrownBy(() -> part.getCachedString())
         .hasMessageContaining("expected String part to be of type BYTE, part =");
   }
+
+  @Test
+  public void writeToOutputStreamResetsPartOnException() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    OutputStream outputStream = mock(OutputStream.class);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+    doThrow(new EOFException("test")).when(heapDataOutputStream).sendTo(eq(outputStream),
+        eq(byteBuffer));
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    Throwable thrown = catchThrowable(() -> part.writeTo(outputStream, byteBuffer));
+
+    assertThat(thrown).isInstanceOf(EOFException.class);
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
+  @Test
+  public void writeToOutputStreamResetsPartOnSuccess() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    OutputStream outputStream = mock(OutputStream.class);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    part.writeTo(outputStream, byteBuffer);
+
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
+  @Test
+  public void writeToByteBufferResetsPartOnException() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+    doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(byteBuffer));
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    Throwable thrown = catchThrowable(() -> part.writeTo(byteBuffer));
+
+    assertThat(thrown).isInstanceOf(BufferOverflowException.class);
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
+  @Test
+  public void writeToByteBufferResetsPartOnSuccess() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    part.writeTo(byteBuffer);
+
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
+  @Test
+  public void writeToSocketChannelResetsPartOnException() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    SocketChannel socketChannel = mock(SocketChannel.class);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+    doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(socketChannel),
+        eq(byteBuffer));
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    Throwable thrown = catchThrowable(() -> part.writeTo(socketChannel, byteBuffer));
+
+    assertThat(thrown).isInstanceOf(BufferOverflowException.class);
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
+  @Test
+  public void writeToSocketChannelResetsPartOnSuccess() throws Exception {
+    HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class);
+    when(heapDataOutputStream.size()).thenReturn(1000);
+    SocketChannel socketChannel = mock(SocketChannel.class);
+    ByteBuffer byteBuffer = mock(ByteBuffer.class);
+
+    Part part = new Part();
+    part.setPartState(heapDataOutputStream, false);
+
+    part.writeTo(socketChannel, byteBuffer);
+
+    verify(heapDataOutputStream, times(1)).rewind();
+  }
+
 }