You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2017/05/15 19:55:33 UTC

geode git commit: GEODE-2865 data loss in initial-image replication with multicast

Repository: geode
Updated Branches:
  refs/heads/develop fe41ed76b -> 614031725


GEODE-2865 data loss in initial-image replication with multicast

The state-flush algorithm relies on MembershipManager.waitForMessageState()
to ensure that all operations have been received and applied to the cache
prior to state replication starting.  For multicast there was a flaw in
the algorithm caused by two things: 1) cache operations were being sent
out-of-band, allowing them to be processed out of order with the state-
flush message, and 2) JGroupsMessenger was only waiting for the messages
to be dispatched by NAKACK2, which isn't necessarily the same as being
dispatched to the DistributionManager Executor that processes the message.

Cache operation messages are now sent in-band.

JGroupsMessenger now tracks NAKACK2 (multicast) sequence numbers of
messages dispatched to the DistributionManager and this is used in
waitForMessageState() to make sure the messages have been queued.
If multicast is enabled we now flush the serial executor to in
waitForMessageState() to make sure that all messages queued in it have
been applied to the region.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/61403172
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/61403172
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/61403172

Branch: refs/heads/develop
Commit: 614031725360a66fdb726dd13136002f35ac6b24
Parents: fe41ed7
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon May 15 12:53:01 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon May 15 12:54:26 2017 -0700

----------------------------------------------------------------------
 .../gms/messenger/JGroupsMessenger.java         | 196 ++++++++++++-------
 .../gms/mgr/GMSMembershipManager.java           |  60 ++++--
 .../internal/cache/StateFlushOperation.java     |   0
 .../distributed/DistributedSystemDUnitTest.java |  10 +-
 .../messenger/JGroupsMessengerJUnitTest.java    |  31 +--
 .../cache/FixedPRSinglehopDUnitTest.java        |   2 +-
 6 files changed, 186 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index e99eff2..bfc8b61 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -15,61 +15,26 @@
 package org.apache.geode.distributed.internal.membership.gms.messenger;
 
 import static org.apache.geode.distributed.internal.membership.gms.GMSUtil.replaceStrings;
-import static org.apache.geode.internal.DataSerializableFixedID.JOIN_REQUEST;
-import static org.apache.geode.internal.DataSerializableFixedID.JOIN_RESPONSE;
 import static org.apache.geode.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ;
 import static org.apache.geode.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP;
+import static org.apache.geode.internal.DataSerializableFixedID.JOIN_REQUEST;
+import static org.apache.geode.internal.DataSerializableFixedID.JOIN_RESPONSE;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import org.apache.logging.log4j.Logger;
-import org.jgroups.Address;
-import org.jgroups.Event;
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.Message.Flag;
-import org.jgroups.Message.TransientFlag;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
-import org.jgroups.ViewId;
-import org.jgroups.conf.ClassConfigurator;
-import org.jgroups.protocols.UDP;
-import org.jgroups.protocols.pbcast.NAKACK2;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.util.Digest;
-import org.jgroups.util.UUID;
-
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.ForcedDisconnectException;
 import org.apache.geode.GemFireConfigException;
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.SystemConnectException;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.DurableClientAttributes;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionStats;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MemberAttributes;
 import org.apache.geode.distributed.internal.membership.NetView;
@@ -86,7 +51,6 @@ import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.OSProcess;
-import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
@@ -95,9 +59,50 @@ import org.apache.geode.internal.cache.DistributedCacheOperation;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.log4j.AlertAppender;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.tcp.MemberShunnedException;
+import org.apache.logging.log4j.Logger;
+import org.jgroups.Address;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Message.Flag;
+import org.jgroups.Message.TransientFlag;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.ViewId;
+import org.jgroups.conf.ClassConfigurator;
+import org.jgroups.protocols.UDP;
+import org.jgroups.protocols.pbcast.NAKACK2;
+import org.jgroups.protocols.pbcast.NakAckHeader2;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Digest;
+import org.jgroups.util.UUID;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 
 @SuppressWarnings("StatementWithEmptyBody")
@@ -139,6 +144,11 @@ public class JGroupsMessenger implements Messenger {
 
   protected final AtomicLong pongsReceived = new AtomicLong(0);
 
+  /** tracks multicast messages that have been scheduled for processing */
+  protected final Map<DistributedMember, MessageTracker> scheduledMcastSeqnos = new HashMap<>();
+
+  protected short nackack2HeaderId;
+
   /**
    * A set that contains addresses that we have logged JGroups IOExceptions for in the current
    * membership view and possibly initiated suspect processing. This reduces the amount of suspect
@@ -323,6 +333,8 @@ public class JGroupsMessenger implements Messenger {
     Transport transport = (Transport) myChannel.getProtocolStack().getTransport();
     transport.setMessenger(this);
 
+    nackack2HeaderId = ClassConfigurator.getProtocolId(NAKACK2.class);
+
     try {
       myChannel.setReceiver(null);
       myChannel.setReceiver(new JGroupsReceiver());
@@ -401,6 +413,14 @@ public class JGroupsMessenger implements Messenger {
     if (encrypt != null) {
       encrypt.installView(v);
     }
+    synchronized (scheduledMcastSeqnos) {
+      for (DistributedMember mbr : v.getCrashedMembers()) {
+        scheduledMcastSeqnos.remove(mbr);
+      }
+      for (DistributedMember mbr : v.getShutdownMembers()) {
+        scheduledMcastSeqnos.remove(mbr);
+      }
+    }
   }
 
 
@@ -556,48 +576,39 @@ public class JGroupsMessenger implements Messenger {
   @Override
   public void waitForMessageState(InternalDistributedMember sender, Map state)
       throws InterruptedException {
-    NAKACK2 nakack = (NAKACK2) myChannel.getProtocolStack().findProtocol("NAKACK2");
     Long seqno = (Long) state.get("JGroups.mcastState");
-    if (nakack != null && seqno != null) {
-      waitForMessageState(nakack, sender, seqno);
+    if (seqno == null) {
+      return;
     }
-  }
-
-  /**
-   * wait for the mcast state from the given member to reach the given seqno
-   */
-  protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long seqno)
-      throws InterruptedException {
     long timeout = services.getConfig().getDistributionConfig().getAckWaitThreshold() * 1000L;
     long startTime = System.currentTimeMillis();
     long warnTime = startTime + timeout;
     long quitTime = warnTime + timeout - 1000L;
     boolean warned = false;
 
-    JGAddress jgSender = new JGAddress(sender);
-
     for (;;) {
-      Digest digest = nakack.getDigest(jgSender);
-      if (digest == null) {
-        return;
-      }
       String received = "none";
-      long[] senderSeqnos = digest.get(jgSender);
-      if (senderSeqnos == null) {
-        break;
+      long highSeqno = 0;
+      synchronized (scheduledMcastSeqnos) {
+        MessageTracker tracker = scheduledMcastSeqnos.get(sender);
+        if (tracker == null) { // no longer in the membership view
+          break;
+        }
+        highSeqno = tracker.get();
       }
+
       if (logger.isDebugEnabled()) {
         logger.debug(
             "waiting for multicast messages from {}.  Current seqno={} and expected seqno={}",
-            sender, senderSeqnos[0], seqno);
+            sender, highSeqno, seqno);
       }
-      if (senderSeqnos[0] >= seqno.longValue()) {
+      if (highSeqno >= seqno.longValue()) {
         break;
       }
       long now = System.currentTimeMillis();
       if (!warned && now >= warnTime) {
         warned = true;
-        received = String.valueOf(senderSeqnos[0]);
+        received = String.valueOf(highSeqno);
         logger.warn(
             "{} seconds have elapsed while waiting for multicast messages from {}.  Received {} but expecting at least {}.",
             Long.toString((warnTime - startTime) / 1000L), sender, received, seqno);
@@ -931,18 +942,17 @@ public class JGroupsMessenger implements Messenger {
   }
 
   void setMessageFlags(DistributionMessage gfmsg, Message msg) {
-    // GemFire uses its own reply processors so there is no need
-    // to maintain message order
-    msg.setFlag(Flag.OOB);
     // Bundling is mostly only useful if we're doing no-ack work,
     // which is fairly rare
     msg.setFlag(Flag.DONT_BUNDLE);
 
     if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
         || gfmsg instanceof HighPriorityDistributionMessage || AlertAppender.isThreadAlerting()) {
+      msg.setFlag(Flag.OOB);
       msg.setFlag(Flag.NO_FC);
       msg.setFlag(Flag.SKIP_BARRIER);
     }
+
     if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
       // we don't want to see our own cache operation messages
       msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
@@ -1281,18 +1291,40 @@ public class JGroupsMessenger implements Messenger {
         msg.setBytesRead(jgmsg.getLength());
 
         try {
-          logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
+
+          if (logger.isTraceEnabled()) {
+            logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
+          }
           filterIncomingMessage(msg);
           getMessageHandler(msg).processMessage(msg);
+
+          // record the scheduling of broadcast messages
+          NakAckHeader2 header = (NakAckHeader2) jgmsg.getHeader(nackack2HeaderId);
+          if (header != null && !jgmsg.isFlagSet(Flag.OOB)) {
+            recordScheduledSeqno(msg.getSender(), header.getSeqno());
+          }
+
         } catch (MemberShunnedException e) {
           // message from non-member - ignore
         }
+
       } finally {
         long delta = DistributionStats.getStatTime() - startTime;
         JGroupsMessenger.this.services.getStatistics().incUDPDispatchRequestTime(delta);
       }
     }
 
+    private void recordScheduledSeqno(DistributedMember member, long seqno) {
+      synchronized (scheduledMcastSeqnos) {
+        MessageTracker counter = scheduledMcastSeqnos.get(member);
+        if (counter == null) {
+          counter = new MessageTracker(seqno);
+          scheduledMcastSeqnos.put(member, counter);
+        }
+        counter.record(seqno);
+      }
+    }
+
     /**
      * returns the handler that should process the given message. The default handler is the
      * membership manager
@@ -1383,4 +1415,22 @@ public class JGroupsMessenger implements Messenger {
       }
     }
   }
+
+  static class MessageTracker {
+    long highestSeqno;
+
+    MessageTracker(long seqno) {
+      highestSeqno = seqno;
+    }
+
+    long get() {
+      return highestSeqno;
+    }
+
+    void record(long seqno) {
+      if (seqno > highestSeqno) {
+        highestSeqno = seqno;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index e920e76..8cdd6a5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -2199,6 +2199,12 @@ public class GMSMembershipManager implements MembershipManager, Manager {
       dc.waitForChannelState(otherMember, state);
     }
     services.getMessenger().waitForMessageState((InternalDistributedMember) otherMember, state);
+
+    if (services.getConfig().getTransport().isMcastEnabled()
+        && !services.getConfig().getDistributionConfig().getDisableTcp()) {
+      // GEODE-2865: wait for scheduled multicast messages to be applied to the cache
+      waitForSerialMessageProcessing((InternalDistributedMember) otherMember);
+    }
   }
 
   /*
@@ -2242,27 +2248,8 @@ public class GMSMembershipManager implements MembershipManager, Manager {
         }
       }
       if (!wait) {
-        // run a message through the member's serial execution queue to ensure that all of its
-        // current messages have been processed
-        OverflowQueueWithDMStats serialQueue = listener.getDM().getSerialQueue(idm);
-        if (serialQueue != null) {
-          final boolean done[] = new boolean[1];
-          final FlushingMessage msg = new FlushingMessage(done);
-          serialQueue.add(new SizeableRunnable(100) {
-            public void run() {
-              msg.invoke();
-            }
-
-            public String toString() {
-              return "Processing fake message";
-            }
-          });
-          synchronized (done) {
-            while (!done[0]) {
-              done.wait(10);
-            }
-            result = true;
-          }
+        if (waitForSerialMessageProcessing(idm)) {
+          result = true;
         }
       }
       if (wait) {
@@ -2281,6 +2268,37 @@ public class GMSMembershipManager implements MembershipManager, Manager {
     return result;
   }
 
+  /**
+   * wait for serial executor messages from the given member to be processed
+   */
+  public boolean waitForSerialMessageProcessing(InternalDistributedMember idm)
+      throws InterruptedException {
+    // run a message through the member's serial execution queue to ensure that all of its
+    // current messages have been processed
+    boolean result = false;
+    OverflowQueueWithDMStats serialQueue = listener.getDM().getSerialQueue(idm);
+    if (serialQueue != null) {
+      final boolean done[] = new boolean[1];
+      final FlushingMessage msg = new FlushingMessage(done);
+      serialQueue.add(new SizeableRunnable(100) {
+        public void run() {
+          msg.invoke();
+        }
+
+        public String toString() {
+          return "Processing fake message";
+        }
+      });
+      synchronized (done) {
+        while (!done[0]) {
+          done.wait(10);
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
 
   // TODO GEODE-1752 rewrite this to get rid of the latches, which are currently a memory leak
   public boolean waitForNewMember(InternalDistributedMember remoteId) {

http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/test/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
index 9a64f53..2e9550a 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
@@ -28,6 +28,7 @@ import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
 import static org.assertj.core.api.Assertions.*;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -109,7 +110,7 @@ public class DistributedSystemDUnitTest extends JUnit4DistributedTestCase {
 
     // schedule a message in order to create a queue for the fake member
     DistributionManager distributionManager = (DistributionManager) system.getDistributionManager();
-    FakeMessage message = new FakeMessage(null);
+    final FakeMessage message = new FakeMessage(null);
 
     distributionManager.getExecutor(SERIAL_EXECUTOR, member).execute(new SizeableRunnable(100) {
 
@@ -124,10 +125,9 @@ public class DistributedSystemDUnitTest extends JUnit4DistributedTestCase {
       }
     });
 
-    // TODO: assert that queue was created or this test is just broken
-
-    assertThat(distributionManager.getMembershipManager().waitForDeparture(member))
-        .as("expected the serial queue to be flushed").isTrue();
+    Assert.assertTrue("expected the serial queue to be flushed",
+        distributionManager.getMembershipManager().waitForDeparture(member));
+    Assert.assertTrue(message.processed);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 307b594..92982e8 100755
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -896,18 +896,19 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testWaitForMessageStateSucceeds() throws Exception {
     initMocks(true/* multicast */);
-    NAKACK2 nakack = mock(NAKACK2.class);
-    Digest digest = mock(Digest.class);
-    when(nakack.getDigest(any(Address.class))).thenReturn(digest);
-    when(digest.get(any(Address.class))).thenReturn(new long[] {0, 0}, new long[] {2, 50},
-        new long[] {49, 50}, new long[] {50, 80}, new long[] {80, 120});
-    messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
-    verify(digest, times(4)).get(isA(Address.class));
-
-    reset(digest);
-    when(digest.get(any(Address.class))).thenReturn(new long[] {0, 0}, new long[] {2, 50}, null);
-    messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
-    verify(digest, times(3)).get(isA(Address.class));
+    JGroupsMessenger.MessageTracker tracker = mock(JGroupsMessenger.MessageTracker.class);
+    InternalDistributedMember mbr = createAddress(1234);
+    messenger.scheduledMcastSeqnos.put(mbr, tracker);
+    when(tracker.get()).thenReturn(0l, 2l, 49l, 50l, 80l);
+    Map state = new HashMap();
+    state.put("JGroups.mcastState", Long.valueOf(50));
+    messenger.waitForMessageState(mbr, state);
+    verify(tracker, times(4)).get();
+
+    reset(tracker);
+    when(tracker.get()).thenReturn(0l, 2l, 60l);
+    messenger.waitForMessageState(mbr, state);
+    verify(tracker, times(3)).get();
   }
 
   @Test
@@ -920,7 +921,11 @@ public class JGroupsMessengerJUnitTest {
         new long[] {49, 50});
     try {
       // message 50 will never arrive
-      messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
+      Map state = new HashMap();
+      state.put("JGroups.mcastState", Long.valueOf(50));
+      InternalDistributedMember mbr = createAddress(1234);
+      messenger.scheduledMcastSeqnos.put(mbr, new JGroupsMessenger.MessageTracker(30));
+      messenger.waitForMessageState(mbr, state);
       fail("expected a GemFireIOException to be thrown");
     } catch (GemFireIOException e) {
       // pass

http://git-wip-us.apache.org/repos/asf/geode/blob/61403172/geode-core/src/test/java/org/apache/geode/internal/cache/FixedPRSinglehopDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FixedPRSinglehopDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FixedPRSinglehopDUnitTest.java
old mode 100644
new mode 100755
index 7e798c8..c8f9bec
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/FixedPRSinglehopDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FixedPRSinglehopDUnitTest.java
@@ -440,7 +440,7 @@ public class FixedPRSinglehopDUnitTest extends JUnit4CacheTestCase {
 
   public static void startLocatorInVM(final int locatorPort) {
 
-    File logFile = new File("locator-" + locatorPort + ".log");
+    File logFile = new File("");
 
     Properties props = new Properties();
     props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "true");