You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/27 17:24:41 UTC

[11/26] geode git commit: GEODE-2398: fix oplog corruption in overflow oplogs

GEODE-2398: fix oplog corruption in overflow oplogs

        * ported changes from original fix in Oplog.java to OverflowOplog.java

This closes #477


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d30aabbe
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d30aabbe
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d30aabbe

Branch: refs/heads/feature/GEM-1299
Commit: d30aabbe96bb98f6e3a94882c4b4ae5134c2de86
Parents: cdbf8ba
Author: Lynn Gallinat <lg...@pivotal.io>
Authored: Mon Apr 24 15:15:03 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Wed Apr 26 23:28:49 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/OverflowOplog.java     |  40 ++-
 .../geode/internal/cache/OplogFlushTest.java    |  42 ----
 .../internal/cache/OverflowOplogFlushTest.java  | 246 +++++++++++++++++++
 3 files changed, 279 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d30aabbe/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
index 78f0c00..65ea728 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
@@ -706,12 +706,15 @@ class OverflowOplog implements CompactableOplog, Flushable {
   // */
   // private long lastWritePos = -1;
 
-  // /**
-  // * test hook
-  // */
-  // public final ByteBuffer getWriteBuf() {
-  // return this.crf.writeBuf;
-  // }
+  /**
+   * test hook
+   */
+  public final ByteBuffer getWriteBuf() {
+    return this.crf.writeBuf;
+  }
+
+  private static final int MAX_CHANNEL_RETRIES = 5;
+
   @Override
   public final void flush() throws IOException {
     final OplogFile olf = this.crf;
@@ -724,8 +727,31 @@ class OverflowOplog implements CompactableOplog, Flushable {
         if (bb != null && bb.position() != 0) {
           bb.flip();
           int flushed = 0;
+          int numChannelRetries = 0;
           do {
-            flushed += olf.channel.write(bb);
+            int channelBytesWritten = 0;
+            final int bbStartPos = bb.position();
+            final long channelStartPos = olf.channel.position();
+            // differentiate between bytes written on this channel.write() iteration and the
+            // total number of bytes written to the channel on this call
+            channelBytesWritten = olf.channel.write(bb);
+            // Expect channelBytesWritten and the changes in pp.position() and channel.position() to
+            // be the same. If they are not, then the channel.write() silently failed. The following
+            // retry separates spurious failures from permanent channel failures.
+            if (channelBytesWritten != bb.position() - bbStartPos) {
+              if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
+                // Reset the ByteBuffer position, but take into account anything that did get
+                // written to the channel
+                channelBytesWritten = (int) (olf.channel.position() - channelStartPos);
+                bb.position(bbStartPos + channelBytesWritten);
+              } else {
+                throw new IOException("Failed to write Oplog entry to " + olf.f.getName() + ": "
+                    + "channel.write() returned " + channelBytesWritten + ", "
+                    + "change in channel position = " + (olf.channel.position() - channelStartPos)
+                    + ", " + "change in source buffer position = " + (bb.position() - bbStartPos));
+              }
+            }
+            flushed += channelBytesWritten;
           } while (bb.hasRemaining());
           // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
           olf.bytesFlushed += flushed;

http://git-wip-us.apache.org/repos/asf/geode/blob/d30aabbe/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
index 7f4f056..0b25e9f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -222,33 +222,6 @@ public class OplogFlushTest extends DiskRegionTestingBase {
     }
   }
 
-  private void doPartialChannelByteArrayFlushForOverflowOpLog(OverflowOplog oplog)
-      throws IOException {
-    OverflowOplog ol = oplog;
-    FileChannel ch = ol.getFileChannel();
-    FileChannel spyCh = spy(ch);
-    ol.testSetCrfChannel(spyCh);
-
-    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
-    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115,
-        116, 117, 118, 119};
-
-    bbArray[0] = bb1 = ByteBuffer.allocate(entry1.length).put(entry1);
-    bbArray[1] = bb2 = ByteBuffer.allocate(entry2.length).put(entry2);
-
-    try {
-      // Set fake channel, that pretends to write partial data.
-      doAnswer(new FakeChannelWriteArrayBB()).when(spyCh).write(bbArray);
-
-      bb2.flip();
-      ol.flush(bb1, bb2);
-      assertEquals("Incomplete flush calls.", 4, nFakeChannelWrites);
-
-    } finally {
-      region.destroyRegion();
-    }
-  }
-
   @Test
   public void testOplogByteArrayFlush() throws Exception {
     region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL);
@@ -260,19 +233,4 @@ public class OplogFlushTest extends DiskRegionTestingBase {
     doPartialChannelByteArrayFlushForOpLog(oplogs);
   }
 
-  @Test
-  public void testOverflowOplogByteArrayFlush() throws Exception {
-    DiskRegionProperties props = new DiskRegionProperties();
-    props.setOverFlowCapacity(1);
-    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
-    region.put("K1", "v1");
-    region.put("K2", "v2");
-
-    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
-    assertNotNull("Unexpected null Oplog", oplog);
-
-    doPartialChannelByteArrayFlushForOverflowOpLog(oplog);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/d30aabbe/geode-core/src/test/java/org/apache/geode/internal/cache/OverflowOplogFlushTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OverflowOplogFlushTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OverflowOplogFlushTest.java
new file mode 100644
index 0000000..5ae8a40
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OverflowOplogFlushTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Testing recovery from failures writing OverflowOplog entries
+ */
+@Category(IntegrationTest.class)
+public class OverflowOplogFlushTest extends DiskRegionTestingBase {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  // How many times to fake the write failures
+  private int nFakeChannelWrites = 0;
+  private OverflowOplog ol = null;
+  private ByteBuffer bb1 = null;
+  private ByteBuffer bb2 = null;
+  private ByteBuffer[] bbArray = new ByteBuffer[2];
+  private FileChannel ch;
+  private FileChannel spyCh;
+
+  @Rule
+  public TestName name = new TestName();
+
+  class FakeChannelWriteBB implements Answer<Integer> {
+
+    @Override
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      return fakeWriteBB(ol, bb1);
+    }
+  }
+
+  private int fakeWriteBB(OverflowOplog ol, ByteBuffer bb) throws IOException {
+    if (nFakeChannelWrites > 0) {
+      bb.position(bb.limit());
+      --nFakeChannelWrites;
+      return 0;
+    }
+    doCallRealMethod().when(spyCh).write(bb);
+    return spyCh.write(bb);
+  }
+
+  private void verifyBB(ByteBuffer bb, byte[] src) {
+    bb.flip();
+    for (int i = 0; i < src.length; ++i) {
+      assertEquals("Channel contents does not match expected at index " + i, src[i], bb.get());
+    }
+  }
+
+  class FakeChannelWriteArrayBB implements Answer<Integer> {
+    @Override
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      System.out.println("### in FakeChannelWriteArrayBB.answer :");
+      return fakeWriteArrayBB(bbArray);
+    }
+  }
+
+  /**
+   * This method tries to write half of the byte buffer to the channel.
+   */
+  private int fakeWriteArrayBB(ByteBuffer[] bbArray) throws IOException {
+    nFakeChannelWrites++;
+    for (ByteBuffer b : bbArray) {
+      int numFakeWrite = b.limit() / 2;
+      if (b.position() <= 0) {
+        b.position(numFakeWrite);
+        return numFakeWrite;
+      } else if (b.position() == numFakeWrite) {
+        b.position(b.limit());
+        return b.limit() - numFakeWrite;
+      }
+    }
+    return 0;
+  }
+
+  private void doChannelFlushWithFailures(OverflowOplog oplog, int numFailures) throws IOException {
+    nFakeChannelWrites = numFailures;
+    ol = oplog;
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115,
+        116, 117, 118, 119};
+
+    bb1 = ol.getWriteBuf();
+    try {
+      // Force channel.write() failures when writing the first entry
+      doAnswer(new FakeChannelWriteBB()).when(spyCh).write(bb1);
+      long chStartPos = ol.getFileChannel().position();
+      bb1.clear();
+      bb1.put(entry1);
+      ol.flush();
+
+      // Write the 2nd entry without forced channel failures
+      nFakeChannelWrites = 0;
+      bb1 = ol.getWriteBuf();
+      bb1.clear();
+      bb1.put(entry2);
+      ol.flush();
+      long chEndPos = ol.getFileChannel().position();
+      assertEquals("Change in channel position does not equal the size of the data flushed",
+          entry1.length + entry2.length, chEndPos - chStartPos);
+      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
+      ol.getFileChannel().position(chStartPos);
+      ol.getFileChannel().read(dst);
+      verifyBB(dst, entry1);
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws Exception {
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1"); // add two entries to make it overflow
+    region.put("K2", "v2");
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog for " + dr.getName(), oplog);
+    doChannelFlushWithFailures(oplog, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception {
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1"); // add two entries to make it overflow
+    region.put("K2", "v2");
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog for " + dr.getName(), oplog);
+    doChannelFlushWithFailures(oplog, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() throws Exception {
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1"); // add two entries to make it overflow
+    region.put("K2", "v2");
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog for " + dr.getName(), oplog);
+
+    doChannelFlushWithFailures(oplog, 3 /* write failures */);
+  }
+
+  @Test
+  public void testOplogFlushThrowsIOExceptioniWhenNumberOfChannelWriteRetriesExceedsLimit()
+      throws Exception {
+    expectedException.expect(IOException.class);
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1"); // add two entries to make it overflow
+    region.put("K2", "v2");
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog for " + dr.getName(), oplog);
+
+    doChannelFlushWithFailures(oplog, 6 /* exceeds the retry limit in Oplog */);
+  }
+
+  private void doPartialChannelByteArrayFlushForOverflowOpLog(OverflowOplog oplog)
+      throws IOException {
+    OverflowOplog ol = oplog;
+    FileChannel ch = ol.getFileChannel();
+    FileChannel spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115,
+        116, 117, 118, 119};
+
+    bbArray[0] = bb1 = ByteBuffer.allocate(entry1.length).put(entry1);
+    bbArray[1] = bb2 = ByteBuffer.allocate(entry2.length).put(entry2);
+
+    try {
+      // Set fake channel, that pretends to write partial data.
+      doAnswer(new FakeChannelWriteArrayBB()).when(spyCh).write(bbArray);
+
+      bb2.flip();
+      ol.flush(bb1, bb2);
+      assertEquals("Incomplete flush calls.", 4, nFakeChannelWrites);
+
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testOverflowOplogByteArrayFlush() throws Exception {
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1");
+    region.put("K2", "v2");
+
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog", oplog);
+
+    doPartialChannelByteArrayFlushForOverflowOpLog(oplog);
+  }
+}