You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/10/01 19:58:42 UTC

[01/26] incubator-geode git commit: GEODE-222 Allow redis adapter to handle live entry objects

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 e3e794d7b -> d624ea73d


GEODE-222 Allow redis adapter to handle live entry objects

Previously encoding a response would not be able to handle the case
where an entry was concurrently destroyed. The fix is to catch the
EntryDestroyedException gracefully


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d5ac2f97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d5ac2f97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d5ac2f97

Branch: refs/heads/feature/GEODE-77
Commit: d5ac2f97e97fbefe6efa710c6825f248f86ec975
Parents: e040750
Author: Vito Gavrilov <vg...@pivotal.io>
Authored: Thu Aug 27 10:54:44 2015 -0700
Committer: Vito Gavrilov <vg...@pivotal.io>
Committed: Thu Aug 27 10:54:44 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/redis/Coder.java  | 194 +++++++++++--------
 1 file changed, 109 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d5ac2f97/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
index 9415cd3..0c35c93 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
@@ -11,6 +11,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.gemstone.gemfire.cache.EntryDestroyedException;
 import com.gemstone.gemfire.cache.query.Struct;
 
 /**
@@ -168,29 +169,38 @@ public class Coder {
     Iterator<Map.Entry<ByteArrayWrapper,ByteArrayWrapper>> it = items.iterator();
     ByteBuf response = alloc.buffer();
     response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(items.size() * 2));
-    response.writeBytes(CRLFar);
 
-    try {
-      while(it.hasNext()) {
-        Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
-        byte[] key = next.getKey().toBytes();
-        byte[] nextByteArray = next.getValue().toBytes();
-        response.writeByte(BULK_STRING_ID); // Add key
-        response.writeBytes(intToBytes(key.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(key);
-        response.writeBytes(CRLFar);
-        response.writeByte(BULK_STRING_ID); // Add value
-        response.writeBytes(intToBytes(nextByteArray.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(nextByteArray);
-        response.writeBytes(CRLFar);
+    int size = 0;
+    ByteBuf tmp = alloc.buffer();
+    while(it.hasNext()) {
+      Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
+      byte[] key;
+      byte[] nextByteArray;
+      try {
+        key = next.getKey().toBytes();
+        nextByteArray = next.getValue().toBytes();
+      } catch (EntryDestroyedException e) {
+        continue;
       }
-    } catch(Exception e) {
-      return null;
+      tmp.writeByte(BULK_STRING_ID); // Add key
+      tmp.writeBytes(intToBytes(key.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(key);
+      tmp.writeBytes(CRLFar);
+      tmp.writeByte(BULK_STRING_ID); // Add value
+      tmp.writeBytes(intToBytes(nextByteArray.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(nextByteArray);
+      tmp.writeBytes(CRLFar);
+      size++;
     }
 
+    response.writeBytes(intToBytes(size*2));
+    response.writeBytes(CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
     return response;
   }
 
@@ -211,27 +221,23 @@ public class Coder {
     response.writeBytes(intToBytes(items.size()));
     response.writeBytes(CRLFar);
 
-    try {
-      while(it.hasNext()) {
-        Object nextObject = it.next();
-        if (nextObject instanceof String) {
-          String next = (String) nextObject;
-          response.writeByte(BULK_STRING_ID);
-          response.writeBytes(intToBytes(next.length()));
-          response.writeBytes(CRLFar);
-          response.writeBytes(stringToBytes(next));
-          response.writeBytes(CRLFar);
-        } else if (nextObject instanceof ByteArrayWrapper) {
-          byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
-          response.writeByte(BULK_STRING_ID);
-          response.writeBytes(intToBytes(next.length));
-          response.writeBytes(CRLFar);
-          response.writeBytes(next);
-          response.writeBytes(CRLFar);
-        }
+    while(it.hasNext()) {
+      Object nextObject = it.next();
+      if (nextObject instanceof String) {
+        String next = (String) nextObject;
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length()));
+        response.writeBytes(CRLFar);
+        response.writeBytes(stringToBytes(next));
+        response.writeBytes(CRLFar);
+      } else if (nextObject instanceof ByteArrayWrapper) {
+        byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length));
+        response.writeBytes(CRLFar);
+        response.writeBytes(next);
+        response.writeBytes(CRLFar);
       }
-    } catch (Exception e) {
-      return null;
     }
     return response;
   }
@@ -260,7 +266,7 @@ public class Coder {
     response.writeBytes(CRLFar);
     return response;
   }
-  
+
   public static final ByteBuf getNoAuthResponse(ByteBufAllocator alloc, String error) {
     byte[] errorAr = stringToBytes(error);
     ByteBuf response = alloc.buffer(errorAr.length + 25);
@@ -306,26 +312,38 @@ public class Coder {
     Iterator<?> it = items.iterator();
     ByteBuf response = alloc.buffer();
     response.writeByte(Coder.ARRAY_ID);
-    response.writeBytes(intToBytes(items.size()));
-    response.writeBytes(Coder.CRLFar);
-
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
     while(it.hasNext()) {
       Object next = it.next();
       ByteArrayWrapper nextWrapper = null;
-      if (next instanceof Entry)
-        nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
-      else if (next instanceof Struct)
+      if (next instanceof Entry) {
+        try {
+          nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
+        }
+      } else if (next instanceof Struct) {
         nextWrapper = (ByteArrayWrapper) ((Struct) next).getFieldValues()[1];
+      }
       if (nextWrapper != null) {
-        response.writeByte(Coder.BULK_STRING_ID);
-        response.writeBytes(intToBytes(nextWrapper.length()));
-        response.writeBytes(Coder.CRLFar);
-        response.writeBytes(nextWrapper.toBytes());
-        response.writeBytes(Coder.CRLFar);
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(nextWrapper.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(nextWrapper.toBytes());
+        tmp.writeBytes(Coder.CRLFar);
       } else {
-        response.writeBytes(Coder.bNIL);
+        tmp.writeBytes(Coder.bNIL);
       }
+      size++;
     }
+
+    response.writeBytes(intToBytes(size));
+    response.writeBytes(Coder.CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
     return response;
   }
 
@@ -335,43 +353,49 @@ public class Coder {
 
     ByteBuf buffer = alloc.buffer();
     buffer.writeByte(Coder.ARRAY_ID);
-    if (!withScores)
-      buffer.writeBytes(intToBytes(list.size()));
-    else
-      buffer.writeBytes(intToBytes(2 * list.size()));
-    buffer.writeBytes(Coder.CRLFar);
-
-    try {
-      for(Object entry: list) {
-        ByteArrayWrapper key;
-        DoubleWrapper score;
-        if (entry instanceof Entry) {
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
+
+    for(Object entry: list) {
+      ByteArrayWrapper key;
+      DoubleWrapper score;
+      if (entry instanceof Entry) {
+        try {
           key = (ByteArrayWrapper) ((Entry<?, ?>) entry).getKey();
-          score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();;
-        } else {
-          Object[] fieldVals = ((Struct) entry).getFieldValues();
-          key = (ByteArrayWrapper) fieldVals[0];
-          score = (DoubleWrapper) fieldVals[1];
-        }
-        byte[] byteAr = key.toBytes();
-        buffer.writeByte(Coder.BULK_STRING_ID);
-        buffer.writeBytes(intToBytes(byteAr.length));
-        buffer.writeBytes(Coder.CRLFar);
-        buffer.writeBytes(byteAr);
-        buffer.writeBytes(Coder.CRLFar);
-        if (withScores) {
-          String scoreString = score.toString();
-          byte[] scoreAr = stringToBytes(scoreString);
-          buffer.writeByte(Coder.BULK_STRING_ID);
-          buffer.writeBytes(intToBytes(scoreString.length()));
-          buffer.writeBytes(Coder.CRLFar);
-          buffer.writeBytes(scoreAr);
-          buffer.writeBytes(Coder.CRLFar);
+          score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
         }
+      } else {
+        Object[] fieldVals = ((Struct) entry).getFieldValues();
+        key = (ByteArrayWrapper) fieldVals[0];
+        score = (DoubleWrapper) fieldVals[1];
+      }
+      byte[] byteAr = key.toBytes();
+      tmp.writeByte(Coder.BULK_STRING_ID);
+      tmp.writeBytes(intToBytes(byteAr.length));
+      tmp.writeBytes(Coder.CRLFar);
+      tmp.writeBytes(byteAr);
+      tmp.writeBytes(Coder.CRLFar);
+      size++;
+      if (withScores) {
+        String scoreString = score.toString();
+        byte[] scoreAr = stringToBytes(scoreString);
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(scoreString.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(scoreAr);
+        tmp.writeBytes(Coder.CRLFar);
+        size++;
       }
-    } catch(Exception e) {
-      return null;
     }
+
+    buffer.writeBytes(intToBytes(size));
+    buffer.writeBytes(Coder.CRLFar);
+    buffer.writeBytes(tmp);
+
+    tmp.release();
+
     return buffer;
   }
 


[16/26] incubator-geode git commit: GEODE-278 Enque events in one more spot.

Posted by bs...@apache.org.
GEODE-278 Enque events in one more spot.

With the previous commit, I had missed the code block for Adjunct messages. Enque tx events in that block too.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4708d4e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4708d4e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4708d4e1

Branch: refs/heads/feature/GEODE-77
Commit: 4708d4e182f89c6a391fcad8bac854f929717685
Parents: 7bc0112
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Thu Sep 17 15:43:52 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Thu Sep 17 15:43:52 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXCommitMessage.java |  29 ++--
 .../cache/ClientServerTransactionDUnitTest.java | 151 ++++++++++++++++---
 2 files changed, 141 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index 94aaadc..2a597e9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -737,7 +737,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
   
   public void basicProcessOps() {
     {
-      List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+      List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
       Collections.sort(this.farSideEntryOps);
       Iterator it = this.farSideEntryOps.iterator();
       while (it.hasNext()) {
@@ -758,14 +758,18 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
     Iterator<EntryEventImpl> ci = callbacks.iterator();
     while(ci.hasNext()) {
       EntryEventImpl ee = ci.next();
-      if(ee.getOperation().isDestroy()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
-      } else if(ee.getOperation().isInvalidate()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
-      } else if(ee.getOperation().isCreate()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
-      } else {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+      try {
+        if (ee.getOperation().isDestroy()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
+        } else if (ee.getOperation().isInvalidate()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
+        } else if (ee.getOperation().isCreate()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
+        } else {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+        }
+      } finally {
+        ee.release();
       }
     }
   }
@@ -1294,7 +1298,6 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
          * This happens when we don't have the bucket and are getting adjunct notification
          */
         EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
-        try {
         if(entryOp.filterRoutingInfo!=null) {
           eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
         }
@@ -1309,10 +1312,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
         // the message was sent and already reflects the change caused by this event.
         // In the latter case we need to invoke listeners
         final boolean skipListeners = !isDuplicate;
-        eei.invokeCallbacks(this.r, skipListeners, true);
-        } finally {
-          eei.release();
-        }
+        eei.setInvokePRCallbacks(!skipListeners);
+        pendingCallbacks.add(eei);
         return;
       }
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
index 51a8dea..4b65a95 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
@@ -21,34 +21,11 @@ import java.util.concurrent.TimeUnit;
 import javax.naming.Context;
 import javax.transaction.UserTransaction;
 
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.*;
 import org.junit.Ignore;
 
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheListener;
-import com.gemstone.gemfire.cache.CacheLoader;
-import com.gemstone.gemfire.cache.CacheLoaderException;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CommitConflictException;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.cache.LoaderHelper;
-import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.RegionFactory;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
-import com.gemstone.gemfire.cache.TransactionEvent;
-import com.gemstone.gemfire.cache.TransactionException;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.TransactionInDoubtException;
-import com.gemstone.gemfire.cache.TransactionWriter;
-import com.gemstone.gemfire.cache.TransactionWriterException;
-import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 import com.gemstone.gemfire.cache.client.ClientCache;
 import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 import com.gemstone.gemfire.cache.client.ClientRegionFactory;
@@ -3250,4 +3227,128 @@ public void testClientCommitAndDataStoreGetsEvent() throws Exception {
     });
     
   }
+
+  public void testAdjunctMessage() {
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = "testAdjunctMessage";
+
+    final int port1 = createRegionsAndStartServer(server1, false);
+    final int port2 = createRegionsAndStartServer(server2, false);
+
+    SerializableCallable createServerRegionWithInterest = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION);
+        rf.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+        rf.create(regionName);
+        return null;
+      }
+    };
+    server1.invoke(createServerRegionWithInterest);
+    server2.invoke(createServerRegionWithInterest);
+
+    // get two colocated keys on server1
+    final List<String> keys = (List<String>) server1.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region r = getCache().getRegion(regionName);
+        PartitionedRegion pr = (PartitionedRegion) r;
+        List<String> server1Keys = new ArrayList<String>();
+        for (int i=0; i<100; i++) {
+          String key = "k"+i;
+          //pr.put(key, "v" + i);
+          DistributedMember owner = pr.getOwnerForKey(pr.getKeyInfo(key));
+          if (owner.equals(pr.getMyId())) {
+            server1Keys.add(key);
+            if (server1Keys.size() == 2) {
+              break;
+            }
+          }
+        }
+        return server1Keys;
+      }
+    });
+
+    class ClientListener extends CacheListenerAdapter {
+      Set keys = new HashSet();
+      @Override
+      public void afterCreate(EntryEvent event) {
+        add(event);
+      }
+      @Override
+      public void afterUpdate(EntryEvent event) {
+        add(event);
+      }
+      private void add(EntryEvent event) {
+        keys.add(event.getKey());
+      }
+    }
+    client2.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port2);
+        ccf.setPoolMinConnections(0);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.setPoolSubscriptionRedundancy(0);
+        ccf.set("log-level", getDUnitLogLevel());
+        ClientCache cCache = getClientCache(ccf);
+        Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).addCacheListener(new ClientListener()).create(regionName);
+        r.registerInterestRegex(".*");
+        //cCache.readyForEvents();
+        return null;
+      }
+    });
+    client1.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
+        ccf.setPoolMinConnections(0);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.set("log-level", getDUnitLogLevel());
+        ClientCache cCache = getClientCache(ccf);
+        Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+        getCache().getCacheTransactionManager().begin();
+        for (String key : keys) {
+          r.put(key, "value");
+        }
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    client2.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region r = getCache().getRegion(regionName);
+        CacheListener[] listeners = r.getAttributes().getCacheListeners();
+        boolean foundListener = false;
+        for (CacheListener listener : listeners) {
+          if (listener instanceof ClientListener) {
+            foundListener = true;
+            final ClientListener clientListener = (ClientListener) listener;
+            WaitCriterion wc = new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return clientListener.keys.containsAll(keys);
+              }
+              @Override
+              public String description() {
+                return "expected:"+keys+" found:"+clientListener.keys;
+              }
+            };
+            DistributedTestCase.waitForCriterion(wc, 30*1000, 500, true);
+          }
+        }
+        assertTrue(foundListener);
+        return null;
+      }
+    });
+  }
 }


[14/26] incubator-geode git commit: GEODE-335: The serial gateway sender queue size statistic is not maintained properly across restarts

Posted by bs...@apache.org.
GEODE-335: The serial gateway sender queue size statistic is not maintained properly across restarts

Incorporated changes from GemFire


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/46ec8aea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/46ec8aea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/46ec8aea

Branch: refs/heads/feature/GEODE-77
Commit: 46ec8aeaa4a2a66e6861d5eca77446067e509ded
Parents: 6fe66f3
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Sep 16 17:39:42 2015 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Sep 16 17:39:42 2015 -0700

----------------------------------------------------------------------
 .../internal/cache/wan/serial/SerialGatewaySenderQueue.java        | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46ec8aea/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index d2691bd..ec8c391 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -206,6 +206,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     this.maximumQueueMemory = abstractSender.getMaximumMemeoryPerDispatcherQueue();
     this.stats = abstractSender.getStatistics();
     initializeRegion(abstractSender, listener);
+    // Increment queue size. Fix for bug 51988.
+    this.stats.incQueueSize(this.region.size());
     this.removalThread = new BatchRemovalThread((GemFireCacheImpl)abstractSender.getCache());
     this.removalThread.start();
     this.sender = abstractSender;


[06/26] incubator-geode git commit: GEODE-309: Move AgentUtilJUnitTest to the proper package

Posted by bs...@apache.org.
GEODE-309: Move AgentUtilJUnitTest to the proper package

Move AgentUtilJUnitTest into com.gemstone.gemfire.management.internal
package. Cleanup test including:

- Remove unused imports

- Uncomment a commented test and use @Ignore with description


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f7706217
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f7706217
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f7706217

Branch: refs/heads/feature/GEODE-77
Commit: f77062171309895b05d34f02d9176ce5af88f6b0
Parents: ba74e9e
Author: Kirk Lund <kl...@pivotal.io>
Authored: Fri Sep 4 12:29:24 2015 -0700
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Fri Sep 4 15:17:32 2015 -0700

----------------------------------------------------------------------
 .../src/test/java/AgentUtilJUnitTest.java       | 109 -------------------
 .../management/internal/AgentUtilJUnitTest.java | 106 ++++++++++++++++++
 2 files changed, 106 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7706217/gemfire-assembly/src/test/java/AgentUtilJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-assembly/src/test/java/AgentUtilJUnitTest.java b/gemfire-assembly/src/test/java/AgentUtilJUnitTest.java
deleted file mode 100644
index 0f7563b..0000000
--- a/gemfire-assembly/src/test/java/AgentUtilJUnitTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import com.gemstone.gemfire.management.internal.AgentUtil;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-@Category(IntegrationTest.class)
-public class AgentUtilJUnitTest {
-
-  private AgentUtil agentUtil;
-  private String version;
-
-  @Before
-  public void setUp() {
-    version = getGemfireVersion();
-    agentUtil = new AgentUtil(version);
-  }
-
-  @Test
-  public void testRESTApiExists() {
-    String gemFireWarLocation = agentUtil.getGemFireWebApiWarLocation();
-    assertNotNull(gemFireWarLocation, "GemFire REST API WAR File was not found");
-  }
-
-  /*
-   * This test should be activated when pulse gets added to Geode
-   */
-  // @Test
-  // public void testPulseWarExists() {
-  // String gemFireWarLocation = agentUtil.getPulseWarLocation();
-  // assertNotNull(gemFireWarLocation, "Pulse WAR File was not found");
-  // }
-
-  private String getGemfireVersion() {
-    String version = null;
-
-    Properties prop = new Properties();
-    InputStream inputStream = null;
-    String pathPrefix = null;
-    try {
-      pathPrefix = calculatePathPrefixToProjectRoot("gemfire-assembly/");
-      inputStream = new FileInputStream(pathPrefix + "gradle.properties");
-    } catch (FileNotFoundException e1) {
-      try {
-        pathPrefix = calculatePathPrefixToProjectRoot("gemfire-core/");
-        inputStream = new FileInputStream(pathPrefix + "gradle.properties");
-      } catch (FileNotFoundException e) {
-      }
-    }
-
-    if (inputStream != null) {
-      try {
-        prop.load(inputStream);
-        version = prop.getProperty("version");
-      } catch (FileNotFoundException e) {
-      } catch (IOException e) {
-      }
-    }
-    return version;
-  }
-
-  private String calculatePathPrefixToProjectRoot(String subDirectory) {
-    String pathPrefix = "";
-
-    String currentDirectoryPath = new File(".").getAbsolutePath();
-    int gemfireCoreLocationIx = currentDirectoryPath.indexOf(subDirectory);
-    if (gemfireCoreLocationIx < 0) {
-      return pathPrefix;
-    }
-
-    String pathFromRoot = currentDirectoryPath.substring(gemfireCoreLocationIx);
-    int segmentsCount = pathFromRoot.split("/").length - 1;
-
-    for (int i = 0; i < segmentsCount; i++) {
-      pathPrefix = pathPrefix + "../";
-    }
-    return pathPrefix;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7706217/gemfire-assembly/src/test/java/com/gemstone/gemfire/management/internal/AgentUtilJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-assembly/src/test/java/com/gemstone/gemfire/management/internal/AgentUtilJUnitTest.java b/gemfire-assembly/src/test/java/com/gemstone/gemfire/management/internal/AgentUtilJUnitTest.java
new file mode 100644
index 0000000..cd2c429
--- /dev/null
+++ b/gemfire-assembly/src/test/java/com/gemstone/gemfire/management/internal/AgentUtilJUnitTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.management.internal;
+
+import com.gemstone.gemfire.management.internal.AgentUtil;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class AgentUtilJUnitTest {
+
+  private AgentUtil agentUtil;
+  private String version;
+
+  @Before
+  public void setUp() {
+    version = getGemfireVersion();
+    agentUtil = new AgentUtil(version);
+  }
+
+  @Test
+  public void testRESTApiExists() {
+    String gemFireWarLocation = agentUtil.getGemFireWebApiWarLocation();
+    assertNotNull(gemFireWarLocation, "GemFire REST API WAR File was not found");
+  }
+
+  @Ignore("This test should be activated when pulse gets added to Geode")
+  @Test
+  public void testPulseWarExists() {
+    String gemFireWarLocation = agentUtil.getPulseWarLocation();
+    assertNotNull(gemFireWarLocation, "Pulse WAR File was not found");
+  }
+
+  private String getGemfireVersion() {
+    String version = null;
+
+    Properties prop = new Properties();
+    InputStream inputStream = null;
+    String pathPrefix = null;
+    try {
+      pathPrefix = calculatePathPrefixToProjectRoot("gemfire-assembly/");
+      inputStream = new FileInputStream(pathPrefix + "gradle.properties");
+    } catch (FileNotFoundException e1) {
+      try {
+        pathPrefix = calculatePathPrefixToProjectRoot("gemfire-core/");
+        inputStream = new FileInputStream(pathPrefix + "gradle.properties");
+      } catch (FileNotFoundException e) {
+      }
+    }
+
+    if (inputStream != null) {
+      try {
+        prop.load(inputStream);
+        version = prop.getProperty("version");
+      } catch (FileNotFoundException e) {
+      } catch (IOException e) {
+      }
+    }
+    return version;
+  }
+
+  private String calculatePathPrefixToProjectRoot(String subDirectory) {
+    String pathPrefix = "";
+
+    String currentDirectoryPath = new File(".").getAbsolutePath();
+    int gemfireCoreLocationIx = currentDirectoryPath.indexOf(subDirectory);
+    if (gemfireCoreLocationIx < 0) {
+      return pathPrefix;
+    }
+
+    String pathFromRoot = currentDirectoryPath.substring(gemfireCoreLocationIx);
+    int segmentsCount = pathFromRoot.split("/").length - 1;
+
+    for (int i = 0; i < segmentsCount; i++) {
+      pathPrefix = pathPrefix + "../";
+    }
+    return pathPrefix;
+  }
+}


[23/26] incubator-geode git commit: Change FindBugs reportLevel to 'low' so all bugs are reported

Posted by bs...@apache.org.
Change FindBugs reportLevel to 'low' so all bugs are reported


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6444722f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6444722f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6444722f

Branch: refs/heads/feature/GEODE-77
Commit: 6444722fbb92a1a9c8a2fbc65cd25d07d0ba300e
Parents: 8639b86
Author: Mark Bretl <mb...@pivotal.io>
Authored: Thu Sep 24 15:22:28 2015 -0700
Committer: Mark Bretl <mb...@pivotal.io>
Committed: Thu Sep 24 15:22:28 2015 -0700

----------------------------------------------------------------------
 build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6444722f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d7b4965..eda7642 100755
--- a/build.gradle
+++ b/build.gradle
@@ -160,7 +160,7 @@ subprojects {
         dep.transitive = true
       }
       findbugs.effort = 'max'
-      findbugs.reportLevel = 'high'
+      findbugs.reportLevel = 'low'
     }
  
     tasks.withType(FindBugs) {


[17/26] incubator-geode git commit: GEODE-346: Fix race condition and simplify junit

Posted by bs...@apache.org.
GEODE-346: Fix race condition and simplify junit

Rebalancer junit were using mocks for testing distributed lock state changes.
Avoid mocks and use DistributedLockService directly


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/71e8dc85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/71e8dc85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/71e8dc85

Branch: refs/heads/feature/GEODE-77
Commit: 71e8dc85fd40c91ef53c2f1ee7dca5cfb082b070
Parents: 4708d4e
Author: Ashvin Agrawal <as...@apache.org>
Authored: Tue Sep 22 14:59:53 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Tue Sep 22 15:06:35 2015 -0700

----------------------------------------------------------------------
 .../cache/util/AutoBalancerJUnitTest.java       | 55 +++++---------------
 1 file changed, 12 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/71e8dc85/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
index bae5f11..f0bcded 100644
--- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
+++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
@@ -11,6 +11,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -146,35 +147,21 @@ public class AutoBalancerJUnitTest {
   public void testLockSuccess() throws InterruptedException {
     cache = createBasicCache();
 
-    final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockDLS).lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
-        will(new CustomAction("acquire lock") {
-          @Override
-          public Object invoke(Invocation invocation) throws Throwable {
-            DistributedLockService dls = new GeodeCacheFacade().getDLS();
-            return dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
-          }
-        });
-      }
-    });
-
-    final AtomicBoolean success = new AtomicBoolean(false);
+    final CountDownLatch locked = new CountDownLatch(1);
+    final AtomicBoolean success = new AtomicBoolean(true);
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
-        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
-          public DistributedLockService getDLS() {
-            return mockDLS;
-          };
-        };
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade();
         success.set(cacheFacade.acquireAutoBalanceLock());
+        locked.countDown();
       }
     });
     thread.start();
-    thread.join();
+    locked.await(1, TimeUnit.SECONDS);
     assertTrue(success.get());
+    DistributedLockService dls = new GeodeCacheFacade().getDLS();
+    assertFalse(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
   }
 
   @Test
@@ -184,29 +171,11 @@ public class AutoBalancerJUnitTest {
     DistributedLockService dls = new GeodeCacheFacade().getDLS();
     assertTrue(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
 
-    final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockDLS).lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
-        will(new CustomAction("acquire lock") {
-          @Override
-          public Object invoke(Invocation invocation) throws Throwable {
-            DistributedLockService dls = new GeodeCacheFacade().getDLS();
-            return dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
-          }
-        });
-      }
-    });
-
     final AtomicBoolean success = new AtomicBoolean(true);
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
-        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
-          public DistributedLockService getDLS() {
-            return mockDLS;
-          }
-        };
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade();
         success.set(cacheFacade.acquireAutoBalanceLock());
       }
     });
@@ -263,7 +232,7 @@ public class AutoBalancerJUnitTest {
   @Test
   public void testAcquireLockAfterReleasedRemotely() throws InterruptedException {
     cache = createBasicCache();
-    
+
     final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     final Sequence sequence = mockContext.sequence("sequence");
     mockContext.checking(new Expectations() {
@@ -279,13 +248,13 @@ public class AutoBalancerJUnitTest {
         will(returnValue(0L));
       }
     });
-    
+
     AutoBalancer balancer = new AutoBalancer();
     balancer.setCacheOperationFacade(mockCacheFacade);
     balancer.getOOBAuditor().execute();
     balancer.getOOBAuditor().execute();
   }
-  
+
   @Test
   public void testFailExecuteIfLockedElsewhere() throws InterruptedException {
     cache = createBasicCache();


[21/26] incubator-geode git commit: GEODE-219 GEODE-297 GEODE-330: Disable iffy tests

Posted by bs...@apache.org.
GEODE-219 GEODE-297 GEODE-330: Disable iffy tests

Some HDFS DUnits are not reliable and need to be fixed. Disable them for now to
reduce nightly build noise.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3d648dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3d648dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3d648dbf

Branch: refs/heads/feature/GEODE-77
Commit: 3d648dbf30f3b3d81ee92ae00eb9f23373668f0d
Parents: 7fcb2fd
Author: Ashvin Agrawal <as...@apache.org>
Authored: Wed Sep 23 10:55:03 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Wed Sep 23 10:57:31 2015 -0700

----------------------------------------------------------------------
 .../cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java        | 4 ++--
 .../gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java      | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3d648dbf/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
index 162e529..5a58dc5 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
@@ -772,7 +772,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
   /**
    * Test that doing a destroyRegion removes all data from HDFS.
    */
-  public void testGlobalDestroyWithQueueData() {
+  public void _testGlobalDestroyWithQueueData() {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
@@ -877,7 +877,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
     });
   }
 
-  public void testGlobalDestroyFromAccessor() {
+  public void _testGlobalDestroyFromAccessor() {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3d648dbf/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
index 92687ed..5fb2949 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
@@ -441,7 +441,7 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase {
     
   }
 
-  public void testWObasicClose() throws Throwable{
+  public void _testWObasicClose() throws Throwable{
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);


[04/26] incubator-geode git commit: Merge branch 'GEODE-222' of https://github.com/vitogav/incubator-geode into develop This closes #18

Posted by bs...@apache.org.
Merge branch 'GEODE-222' of https://github.com/vitogav/incubator-geode into develop
This closes #18


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f9d70842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f9d70842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f9d70842

Branch: refs/heads/feature/GEODE-77
Commit: f9d708427d5b9826be53ae1d27ddb689adfcf562
Parents: f0f6176 d5ac2f9
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Wed Sep 2 12:33:50 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Wed Sep 2 12:33:50 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/redis/Coder.java  | 194 +++++++++++--------
 1 file changed, 109 insertions(+), 85 deletions(-)
----------------------------------------------------------------------



[09/26] incubator-geode git commit: GEODE-322: Added following parameters while creating the HDFSStore

Posted by bs...@apache.org.
GEODE-322: Added following parameters while creating the HDFSStore

- maxMemory
- minorCompactionThreads
- purgeInterval


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1517f88f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1517f88f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1517f88f

Branch: refs/heads/feature/GEODE-77
Commit: 1517f88fd99957c11f6306c82f2d3537960d22d0
Parents: 16571a6
Author: nthanvi <nt...@pivotal.io>
Authored: Thu Sep 10 17:38:04 2015 +0530
Committer: nthanvi <nt...@pivotal.io>
Committed: Thu Sep 10 17:38:04 2015 +0530

----------------------------------------------------------------------
 .../internal/cli/functions/CreateHDFSStoreFunction.java          | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1517f88f/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateHDFSStoreFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateHDFSStoreFunction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateHDFSStoreFunction.java
index 0667020..b4e5033 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateHDFSStoreFunction.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateHDFSStoreFunction.java
@@ -96,11 +96,13 @@ public class CreateHDFSStoreFunction extends FunctionAdapter implements Internal
     hdfsStoreFactory.setMajorCompactionInterval(configHolder.getMajorCompactionInterval());
     hdfsStoreFactory.setMajorCompactionThreads(configHolder.getMajorCompactionThreads());
     hdfsStoreFactory.setMinorCompaction(configHolder.getMinorCompaction());
-    
+    hdfsStoreFactory.setMaxMemory(configHolder.getMaxMemory());
     hdfsStoreFactory.setBatchSize(configHolder.getBatchSize());
     hdfsStoreFactory.setBatchInterval(configHolder.getBatchInterval());
     hdfsStoreFactory.setDiskStoreName(configHolder.getDiskStoreName());
     hdfsStoreFactory.setDispatcherThreads(configHolder.getDispatcherThreads());
+    hdfsStoreFactory.setMinorCompactionThreads(configHolder.getMinorCompactionThreads());
+    hdfsStoreFactory.setPurgeInterval(configHolder.getPurgeInterval());
     hdfsStoreFactory.setSynchronousDiskWrite(configHolder.getSynchronousDiskWrite());
     hdfsStoreFactory.setBufferPersistent(configHolder.getBufferPersistent());
     


[26/26] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-77

Posted by bs...@apache.org.
Merge remote-tracking branch 'origin/develop' into feature/GEODE-77


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d624ea73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d624ea73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d624ea73

Branch: refs/heads/feature/GEODE-77
Commit: d624ea73d0b12b7e21ddbd5e72ed3ca07bc0c71c
Parents: e3e794d e1c2d8e
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Oct 1 10:32:51 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Oct 1 10:32:51 2015 -0700

----------------------------------------------------------------------
 .../src/test/java/AgentUtilJUnitTest.java       | 109 ---------
 .../management/internal/AgentUtilJUnitTest.java | 106 ++++++++
 .../gemstone/gemfire/cache/EvictionAction.java  |   3 +-
 .../gemfire/cache/EvictionAlgorithm.java        |   3 +-
 .../index/FunctionalIndexCreationHelper.java    |   1 -
 .../query/internal/index/PartitionedIndex.java  |  16 +-
 .../gemstone/gemfire/internal/SocketCloser.java | 241 +++++++++++++++++++
 .../gemfire/internal/SocketCreator.java         |  98 --------
 .../gemfire/internal/cache/TXCommitMessage.java |  29 +--
 .../gemfire/internal/cache/TXEntryState.java    |  19 +-
 .../cache/tier/sockets/CacheClientNotifier.java |  10 +
 .../cache/tier/sockets/CacheClientProxy.java    |  32 ++-
 .../wan/serial/SerialGatewaySenderQueue.java    |   2 +
 .../offheap/SimpleMemoryAllocatorImpl.java      |   8 +-
 .../gemstone/gemfire/internal/redis/Coder.java  | 194 ++++++++-------
 .../gemfire/internal/redis/RegionProvider.java  |  44 ++--
 .../gemfire/internal/tcp/Connection.java        | 130 ++++++----
 .../gemfire/internal/tcp/ConnectionTable.java   |  95 +++++++-
 .../internal/beans/CacheServerBridge.java       |   2 +
 .../cli/functions/CreateHDFSStoreFunction.java  |   4 +-
 .../gemfire/pdx/internal/TypeRegistry.java      |  15 +-
 .../gemfire/cache/Bug52289JUnitTest.java        |  81 +++++++
 .../internal/RegionWithHDFSBasicDUnitTest.java  |   4 +-
 .../hdfs/internal/RegionWithHDFSTestBase.java   |   4 +-
 .../query/internal/index/IndexUseJUnitTest.java |  52 ++++
 .../cache30/BridgeMembershipDUnitTest.java      |  38 +--
 .../gemstone/gemfire/cache30/CacheTestCase.java |  27 +--
 .../gemfire/cache30/RegionTestCase.java         |   1 +
 .../gemfire/internal/SocketCloserJUnitTest.java | 180 ++++++++++++++
 .../internal/SocketCloserWithWaitJUnitTest.java |  22 ++
 .../cache/ClientServerTransactionDUnitTest.java | 151 ++++++++++--
 .../cache/ConcurrentMapOpsDUnitTest.java        |  69 +++++-
 .../offheap/OffHeapValidationJUnitTest.java     |  45 +++-
 .../test/java/dunit/DistributedTestCase.java    |  59 +++++
 .../cache/util/AutoBalancerJUnitTest.java       |  55 +----
 35 files changed, 1393 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index 55a0b53,940936f..32aa1c6
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@@ -75,9 -75,9 +75,8 @@@ import com.gemstone.gemfire.internal.ca
  import com.gemstone.gemfire.internal.cache.wan.TransportFilterSocketFactory;
  import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
  import com.gemstone.gemfire.internal.logging.LogService;
- import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
  import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
  import com.gemstone.gemfire.internal.util.PasswordUtil;
 -import com.gemstone.org.jgroups.util.ConnectionWatcher;
  
  import java.util.*;
  

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 384ee94,508eba2..387b737
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@@ -31,8 -38,9 +38,9 @@@ import com.gemstone.gemfire.distributed
  import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
  import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
  import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 -import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
  import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.SocketCloser;
  import com.gemstone.gemfire.internal.SocketCreator;
  import com.gemstone.gemfire.internal.SystemTimer;
  import com.gemstone.gemfire.internal.i18n.LocalizedStrings;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d624ea73/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --cc gemfire-core/src/test/java/dunit/DistributedTestCase.java
index 2e6346d,c78510a..41d7068
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@@ -737,8 -802,11 +760,9 @@@ public abstract class DistributedTestCa
      InternalBridgeMembership.unregisterAllListeners();
      ClientStatsManager.cleanupForTests();
      unregisterInstantiatorsInThisVM();
 -    GemFireTracer.DEBUG = Boolean.getBoolean("DistributionManager.DEBUG_JAVAGROUPS");
 -    Protocol.trace = GemFireTracer.DEBUG;
      DistributionMessageObserver.setInstance(null);
      QueryObserverHolder.reset();
+     DiskStoreObserver.setInstance(null);
      if (InternalDistributedSystem.systemAttemptingReconnect != null) {
        InternalDistributedSystem.systemAttemptingReconnect.stopReconnecting();
      }


[07/26] incubator-geode git commit: GEODE-313 Improve redis adpater region exception handling

Posted by bs...@apache.org.
GEODE-313 Improve redis adpater region exception handling

When creating regions for sorted sets, better handling of index creation
exceptions now correctly deals with possible errors. Also, error logging
from the RegionProvider class has been removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c1de3fec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c1de3fec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c1de3fec

Branch: refs/heads/feature/GEODE-77
Commit: c1de3fec4e7937669c0ce39bbdeb77a597414b16
Parents: ba74e9e
Author: Vito Gavrilov <vg...@pivotal.io>
Authored: Tue Sep 8 10:55:12 2015 -0700
Committer: Vito Gavrilov <vg...@pivotal.io>
Committed: Tue Sep 8 10:55:12 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/redis/RegionProvider.java  | 44 ++++++++++----------
 1 file changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c1de3fec/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
index cadaf5f..0240a4c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
@@ -13,12 +13,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.gemstone.gemfire.LogWriter;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheTransactionManager;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexInvalidException;
 import com.gemstone.gemfire.cache.query.IndexNameConflictException;
 import com.gemstone.gemfire.cache.query.Query;
 import com.gemstone.gemfire.cache.query.QueryInvalidException;
@@ -75,7 +76,6 @@ public class RegionProvider implements Closeable {
   private final RegionShortcut defaultRegionType;
   private static final CreateAlterDestroyRegionCommands cliCmds = new CreateAlterDestroyRegionCommands();
   private final ConcurrentHashMap<String, Lock> locks;
-  private final LogWriter logger;
 
   public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion, Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion, Region<String, RedisDataType> redisMetaRegion, ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap, ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut) {
     if (stringsRegion == null || hLLRegion == null || redisMetaRegion == null)
@@ -90,7 +90,6 @@ public class RegionProvider implements Closeable {
     this.expirationExecutor = expirationExecutor;
     this.defaultRegionType = defaultShortcut;
     this.locks = new ConcurrentHashMap<String, Lock>();
-    this.logger = this.cache.getLogger();
   }
 
   public boolean existsKey(ByteArrayWrapper key) {
@@ -219,11 +218,16 @@ public class RegionProvider implements Closeable {
           // simply ignore. Calls to getRegion or getOrCreate will work correctly
           if (r == null)
             return;
-          
-          if (type == RedisDataType.REDIS_LIST)
+
+          if (type == RedisDataType.REDIS_LIST) {
             doInitializeList(key, r);
-          else if (type == RedisDataType.REDIS_SORTEDSET)
-            doInitializeSortedSet(key, r);
+          } else if (type == RedisDataType.REDIS_SORTEDSET) {
+            try {
+              doInitializeSortedSet(key, r);
+            } catch (RegionNotFoundException | IndexInvalidException e) {
+              //ignore
+            }
+          }
           this.regions.put(key, r);
         }
       } finally {
@@ -266,10 +270,15 @@ public class RegionProvider implements Closeable {
               concurrentCreateDestroyException = null;
               r = createRegionGlobally(stringKey);
               try {
-                if (type == RedisDataType.REDIS_LIST)
+                if (type == RedisDataType.REDIS_LIST) {
                   doInitializeList(key, r);
-                else if (type == RedisDataType.REDIS_SORTEDSET)
-                  doInitializeSortedSet(key, r);
+                } else if (type == RedisDataType.REDIS_SORTEDSET) {
+                  try {
+                    doInitializeSortedSet(key, r);
+                  } catch (RegionNotFoundException | IndexInvalidException e) {
+                    concurrentCreateDestroyException = e;
+                  }
+                }
               } catch (QueryInvalidException e) {
                 if (e.getCause() instanceof RegionNotFoundException) {
                   concurrentCreateDestroyException = e;
@@ -326,17 +335,13 @@ public class RegionProvider implements Closeable {
     this.regions.remove(key);
   }
 
-  private void doInitializeSortedSet(ByteArrayWrapper key, Region<?, ?> r) {
+  private void doInitializeSortedSet(ByteArrayWrapper key, Region<?, ?> r) throws RegionNotFoundException, IndexInvalidException {
     String fullpath = r.getFullPath();
     try {
       queryService.createIndex("scoreIndex", "entry.value.score", r.getFullPath() + ".entrySet entry");
       queryService.createIndex("scoreIndex2", "value.score", r.getFullPath() + ".values value");
-    } catch (Exception e) {
-      if (!(e instanceof IndexNameConflictException)) {
-        if (logger.errorEnabled()) {
-          logger.error(e);
-        }
-      }
+    } catch (IndexNameConflictException | IndexExistsException | UnsupportedOperationException e) {
+      // ignore, these indexes already exist or unsupported but make sure prepared queries are made
     }
     HashMap<Enum<?>, Query> queryList = new HashMap<Enum<?>, Query>();
     for (SortedSetQuery lq: SortedSetQuery.values()) {
@@ -379,9 +384,6 @@ public class RegionProvider implements Closeable {
         String err = "";
         while(result.hasNextLine())
           err += result.nextLine();
-        if (this.logger.errorEnabled()) {
-          this.logger.error("Region creation failure- "+ err);
-        }
         throw new RegionCreationException(err);
       }
     } while(r == null); // The region can be null in the case that it is concurrently destroyed by
@@ -397,7 +399,7 @@ public class RegionProvider implements Closeable {
     } else {
       return this.queryService.newQuery(((SortedSetQuery)query).getQueryString(this.regions.get(key).getFullPath()));
     }
-    */
+     */
   }
 
   /**


[05/26] incubator-geode git commit: GEODE-307: Clear DiskStoreObserver after each test

Posted by bs...@apache.org.
GEODE-307: Clear DiskStoreObserver after each test

An observer was being left over and causing later tests to fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ba74e9ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ba74e9ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ba74e9ea

Branch: refs/heads/feature/GEODE-77
Commit: ba74e9ea849e1ea38b742278e28d308218cedc63
Parents: f9d7084
Author: Dan Smith <up...@apache.org>
Authored: Wed Sep 2 15:23:11 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Thu Sep 3 10:46:10 2015 -0700

----------------------------------------------------------------------
 gemfire-core/src/test/java/dunit/DistributedTestCase.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ba74e9ea/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/DistributedTestCase.java b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
index 8aa8b6d..2e7ac03 100755
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.internal.InternalInstantiator;
 import com.gemstone.gemfire.internal.OSProcess;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.admin.ClientStatsManager;
+import com.gemstone.gemfire.internal.cache.DiskStoreObserver;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -784,6 +785,7 @@ public abstract class DistributedTestCase extends TestCase implements java.io.Se
     Protocol.trace = GemFireTracer.DEBUG;
     DistributionMessageObserver.setInstance(null);
     QueryObserverHolder.reset();
+    DiskStoreObserver.setInstance(null);
     if (InternalDistributedSystem.systemAttemptingReconnect != null) {
       InternalDistributedSystem.systemAttemptingReconnect.stopReconnecting();
     }


[20/26] incubator-geode git commit: GEODE-332: use thread pools for p2p readers and async close

Posted by bs...@apache.org.
GEODE-332: use thread pools for p2p readers and async close

The old code always created a brand new thread when it
wanted to async close a socket or create a new p2p reader
or handshake with a p2p reader. Now it will reuse threads
which improves latency and having a hard cap on the maximum
number of closer threads prevents a large number of close
threads causing an OutOfMemory exception.

Introduced a new SocketCloser class for async close.
The ConnectionTable has a SocketCloser instance
for closing peer-to-peer sockets and the
CacheClientNotifier has one for closing sockets
used by a cache server to send queue data to clients.

The ConnectionTable closer will have at most 8 threads
per address its sockets are connected to. If these
threads are not used for 120 seconds they will timeout.
This timeout can be configured using the
p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS system property.
The maximum threads per address can be changed from 8
using the p2p.ASYNC_CLOSE_POOL_MAX_THREADS system property.
By default when an async socket close request is made
the requestor does not wait for request to be done.
In previous releases the requestor waited 50 milliseconds.
Now a wait can be configured using the
p2p.ASYNC_CLOSE_WAIT_MILLISECONDS system property.

The CacheClientNotifier closer will have at most 1 thread
per address its sockets are connected to. If these
threads are not used for 120 seconds they will timeout.
This timeout can be configured using the
p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS system property.
Also this closer forces all requestors to wait 50 milliseconds
for the close to be done.

ConnectionTable also uses a thread pool for when ever
it needs a thread for a p2p reader or when a p2p sender
needs a thread to do the initial handshake. This pool
has an unlimited number of threads but if a thread is
not used for 120 seconds it will timeout. This timeout
can be configured using the p2p.READER_POOL_KEEP_ALIVE_TIME
system property.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7fcb2fd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7fcb2fd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7fcb2fd9

Branch: refs/heads/feature/GEODE-77
Commit: 7fcb2fd938338c9fde2a152364c094c196c5a045
Parents: eb89661
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:03:40 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Sep 23 10:13:54 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/SocketCloser.java | 241 +++++++++++++++++++
 .../gemfire/internal/SocketCreator.java         |  98 --------
 .../cache/tier/sockets/CacheClientNotifier.java |  10 +
 .../cache/tier/sockets/CacheClientProxy.java    |  32 ++-
 .../gemfire/internal/tcp/Connection.java        | 130 ++++++----
 .../gemfire/internal/tcp/ConnectionTable.java   |  95 +++++++-
 .../gemfire/internal/SocketCloserJUnitTest.java | 180 ++++++++++++++
 .../internal/SocketCloserWithWaitJUnitTest.java |  22 ++
 8 files changed, 639 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
new file mode 100644
index 0000000..8468daa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
@@ -0,0 +1,241 @@
+package com.gemstone.gemfire.internal;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+
+/**
+ * This class allows sockets to be closed without blocking.
+ * In some cases we have seen a call of socket.close block for minutes.
+ * This class maintains a thread pool for every other member we have
+ * connected sockets to. Any request to close by default returns immediately
+ * to the caller while the close is called by a background thread.
+ * The requester can wait for a configured amount of time by setting
+ * the "p2p.ASYNC_CLOSE_WAIT_MILLISECONDS" system property.
+ * Idle threads that are not doing a close will timeout after 2 minutes.
+ * This can be configured by setting the
+ * "p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS" system property.
+ * A pool exists for each remote address that we have a socket connected to.
+ * That way if close is taking a long time to one address we can still get closes
+ * done to another address.
+ * Each address pool by default has at most 8 threads. This max threads can be
+ * configured using the "p2p.ASYNC_CLOSE_POOL_MAX_THREADS" system property.
+ */
+public class SocketCloser {
+  private static final Logger logger = LogService.getLogger();
+  /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
+  static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+  /** Maximum number of threads that can be doing a socket close. Any close requests over this max will queue up waiting for a thread. */
+  static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  /** How many milliseconds the synchronous requester waits for the async close to happen. Default is 0. Prior releases waited 50ms. */ 
+  static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  
+
+  /** map of thread pools of async close threads */
+  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+  private final long asyncClosePoolKeepAliveSeconds;
+  private final int asyncClosePoolMaxThreads;
+  private final long asyncCloseWaitTime;
+  private final TimeUnit asyncCloseWaitUnits;
+  private boolean closed;
+  
+  public SocketCloser() {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
+  }
+  public SocketCloser(int asyncClosePoolMaxThreads, long asyncCloseWaitMillis) {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, asyncCloseWaitMillis, TimeUnit.MILLISECONDS);
+  }
+  public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads, long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
+    this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
+    this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
+    this.asyncCloseWaitTime = asyncCloseWaitTime;
+    this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+  }
+  
+  public int getMaxThreads() {
+    return this.asyncClosePoolMaxThreads;
+  }
+
+  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool == null) {
+        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+        ThreadFactory tf = new ThreadFactory() { 
+          public Thread newThread(final Runnable command) { 
+            Thread thread = new Thread(tg, command); 
+            thread.setDaemon(true);
+            return thread;
+          } 
+        }; 
+        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); 
+        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
+        pool.allowCoreThreadTimeOut(true);
+        asyncCloseExecutors.put(address, pool);
+      }
+      return pool;
+    }
+  }
+  /**
+   * Call this method if you know all the resources in the closer
+   * for the given address are no longer needed.
+   * Currently a thread pool is kept for each address and if you
+   * know that an address no longer needs its pool then you should
+   * call this method.
+   */
+  public void releaseResourcesForAddress(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool != null) {
+        pool.shutdown();
+        asyncCloseExecutors.remove(address);
+      }
+    }
+  }
+  private boolean isClosed() {
+    synchronized (asyncCloseExecutors) {
+      return this.closed;
+    }
+  }
+  /**
+   * Call close when you are all done with your socket closer.
+   * If you call asyncClose after close is called then the
+   * asyncClose will be done synchronously.
+   */
+  public void close() {
+    synchronized (asyncCloseExecutors) {
+      if (!this.closed) {
+        this.closed = true;
+        for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
+          pool.shutdown();
+        }
+        asyncCloseExecutors.clear();
+      }
+    }
+  }
+  private void asyncExecute(String address, Runnable r) {
+    // Waiting 50ms for the async close request to complete is what the old (close per thread)
+    // code did. But now that we will not create a thread for every close request
+    // it seems better to let the thread that requested the close to move on quickly.
+    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
+    // can be set to how many milliseconds to wait.
+    if (this.asyncCloseWaitTime == 0) {
+      getAsyncThreadExecutor(address).execute(r);
+    } else {
+      Future<?> future = getAsyncThreadExecutor(address).submit(r);
+      try {
+        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most 50ms for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
+  /**
+   * Closes the specified socket in a background thread.
+   * In some cases we see close hang (see bug 33665).
+   * Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
+   * this method may block for a certain amount of time.
+   * If it is called after the SocketCloser is closed then a normal
+   * synchronous close is done.
+   * @param sock the socket to close
+   * @param address identifies who the socket is connected to
+   * @param extra an optional Runnable with stuff to execute in the async thread
+   */
+  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
+    if (sock == null || sock.isClosed()) {
+      return;
+    }
+    boolean doItInline = false;
+    try {
+      synchronized (asyncCloseExecutors) {
+        if (isClosed()) {
+          // this SocketCloser has been closed so do a synchronous, inline, close
+          doItInline = true;
+        } else {
+          asyncExecute(address, new Runnable() {
+            public void run() {
+              Thread.currentThread().setName("AsyncSocketCloser for " + address);
+              try {
+                if (extra != null) {
+                  extra.run();
+                }
+                inlineClose(sock);
+              } finally {
+                Thread.currentThread().setName("unused AsyncSocketCloser");
+              }
+            }
+          });
+        }
+      }
+    } catch (OutOfMemoryError ignore) {
+      // If we can't start a thread to close the socket just do it inline.
+      // See bug 50573.
+      doItInline = true;
+    }
+    if (doItInline) {
+      if (extra != null) {
+        extra.run();
+      }
+      inlineClose(sock);
+    }
+  }
+  
+
+  /**
+   * Closes the specified socket
+   * @param sock the socket to close
+   */
+  private static void inlineClose(final Socket sock) {
+    // the next two statements are a mad attempt to fix bug
+    // 36041 - segv in jrockit in pthread signaling code.  This
+    // seems to alleviate the problem.
+    try {
+      sock.shutdownInput();
+      sock.shutdownOutput();
+    } catch (Exception e) {
+    }
+    try {
+      sock.close();
+    } catch (IOException ignore) {
+    } catch (VirtualMachineError err) {
+      SystemFailure.initiateFailure(err);
+      // If this ever returns, rethrow the error.  We're poisoned
+      // now, so don't let this thread continue.
+      throw err;
+    } catch (java.security.ProviderException pe) {
+      // some ssl implementations have trouble with termination and throw
+      // this exception.  See bug #40783
+    } catch (Error e) {
+      // Whenever you catch Error or Throwable, you must also
+      // catch VirtualMachineError (see above).  However, there is
+      // _still_ a possibility that you are dealing with a cascading
+      // error condition, so you also need to check to see if the JVM
+      // is still usable:
+      SystemFailure.checkFailure();
+      // Sun's NIO implementation has been known to throw Errors
+      // that are caused by IOExceptions.  If this is the case, it's
+      // okay.
+      if (e.getCause() instanceof IOException) {
+        // okay...
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index ff4a22c..940936f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -75,7 +75,6 @@ import com.gemstone.gemfire.internal.cache.wan.TransportFilterServerSocket;
 import com.gemstone.gemfire.internal.cache.wan.TransportFilterSocketFactory;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.util.PasswordUtil;
 import com.gemstone.org.jgroups.util.ConnectionWatcher;
@@ -1197,103 +1196,6 @@ public class SocketCreator  implements com.gemstone.org.jgroups.util.SockCreator
     return (String[]) v.toArray( new String[ v.size() ] );
   }
   
-  /**
-   * Closes the specified socket in a background thread and waits a limited
-   * amount of time for the close to complete. In some cases we see close
-   * hang (see bug 33665).
-   * Made public so it can be used from CacheClientProxy.
-   * @param sock the socket to close
-   * @param who who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async thread
-   */
-  public static void asyncClose(final Socket sock, String who, final Runnable extra) {
-    if (sock == null || sock.isClosed()) {
-      return;
-    }
-    try {
-    ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-
-    Thread t = new Thread(tg, new Runnable() {
-        public void run() {
-          if (extra != null) {
-            extra.run();
-          }
-          inlineClose(sock);
-        }
-      }, "AsyncSocketCloser for " + who);
-    t.setDaemon(true);
-    try {
-      t.start();
-    } catch (OutOfMemoryError ignore) {
-      // If we can't start a thread to close the socket just do it inline.
-      // See bug 50573.
-      inlineClose(sock);
-      return;
-    }
-    try {
-      // [bruce] if the network fails, this will wait the full amount of time
-      // on every close, so it must be kept very short.  it was 750ms before,
-      // causing frequent hangs in net-down hydra tests
-      t.join(50/*ms*/);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      // NOTREACHED
-      throw e;
-    }
-  }
-  
-
-  /**
-   * Closes the specified socket
-   * @param sock the socket to close
-   */
-  public static void inlineClose(final Socket sock) {
-    
-    // the next two statements are a mad attempt to fix bug
-    // 36041 - segv in jrockit in pthread signaling code.  This
-    // seems to alleviate the problem.
-    try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
-    }
-    catch (Exception e) {
-    }
-    try {
-      sock.close();
-    } catch (IOException ignore) {
-    } 
-    catch (VirtualMachineError err) {
-      SystemFailure.initiateFailure(err);
-      // If this ever returns, rethrow the error.  We're poisoned
-      // now, so don't let this thread continue.
-      throw err;
-    }
-    catch (java.security.ProviderException pe) {
-      // some ssl implementations have trouble with termination and throw
-      // this exception.  See bug #40783
-    }
-    catch (Error e) {
-      // Whenever you catch Error or Throwable, you must also
-      // catch VirtualMachineError (see above).  However, there is
-      // _still_ a possibility that you are dealing with a cascading
-      // error condition, so you also need to check to see if the JVM
-      // is still usable:
-      SystemFailure.checkFailure();
-      // Sun's NIO implementation has been known to throw Errors
-      // that are caused by IOExceptions.  If this is the case, it's
-      // okay.
-      if (e.getCause() instanceof IOException) {
-        // okay...
-
-      } else {
-        throw e;
-      }
-    }
-  }
   
   protected void initializeClientSocketFactory() {
     this.clientSocketFactory = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 2cede25..deddfd1 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -73,6 +73,7 @@ import com.gemstone.gemfire.internal.ClassLoadUtil;
 import com.gemstone.gemfire.internal.DummyStatisticsFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.Version;
@@ -1668,6 +1669,8 @@ public class CacheClientNotifier {
 
       // Close the statistics
       this._statistics.close();
+      
+      this.socketCloser.close();
     } 
   }
 
@@ -2120,6 +2123,7 @@ public class CacheClientNotifier {
     // Set the Cache
     this.setCache((GemFireCacheImpl)cache);
     this.acceptorStats = acceptorStats;
+    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms for close
 
     // Set the LogWriter
     this.logWriter = (InternalLogWriter)cache.getLogger();
@@ -2385,6 +2389,10 @@ public class CacheClientNotifier {
     return this.acceptorStats;
   }
   
+  public SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   public void addCompiledQuery(DefaultQuery query){
     if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null){
       // Added successfully.
@@ -2651,6 +2659,8 @@ public class CacheClientNotifier {
 
   private SystemTimer.SystemTimerTask clientPingTask;
   
+  private final SocketCloser socketCloser;
+  
   private static final long CLIENT_PING_TASK_PERIOD =
     Long.getLong("gemfire.serverToClientPingPeriod", 60000);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 15f83bb..85c7493 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -63,7 +63,6 @@ import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.Version;
@@ -116,6 +115,8 @@ public class CacheClientProxy implements ClientSession {
    * The socket between the server and the client
    */
   protected Socket _socket;
+  
+  private final AtomicBoolean _socketClosed = new AtomicBoolean();
 
   /**
    * A communication buffer used by each message we send to the client
@@ -960,10 +961,7 @@ public class CacheClientProxy implements ClientSession {
       // to fix bug 37684
       // 1. check to see if dispatcher is still alive
       if (this._messageDispatcher.isAlive()) {
-        if (this._socket != null && !this._socket.isClosed()) {
-          SocketCreator.asyncClose(this._socket, this._remoteHostAddress, null);
-          getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
-        }
+        closeSocket();
         destroyRQ();
         alreadyDestroyed = true;
         this._messageDispatcher.interrupt();
@@ -996,19 +994,27 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  private void closeTransientFields() {
-    // Close the socket
-    if (this._socket != null && !this._socket.isClosed()) {
-      try {
-        this._socket.close();
-        getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
-      } catch (IOException e) {/*ignore*/}
+  private void closeSocket() {
+    if (this._socketClosed.compareAndSet(false, true)) {
+      // Close the socket
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, null);
+      getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
+  }
+  
+  private void closeTransientFields() {
+    closeSocket();
 
     // Null out comm buffer, host address, ports and proxy id. All will be
     // replaced when the client reconnects.
     releaseCommBuffer();
-    this._remoteHostAddress = null;
+    {
+      String remoteHostAddress = this._remoteHostAddress;
+      if (remoteHostAddress != null) {
+        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+        this._remoteHostAddress = null;
+      }
+    }
     try {
       this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
     } catch (CacheClosedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index cd1b7dc..88bca22 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -62,6 +62,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
@@ -115,6 +116,11 @@ public class Connection implements Runnable {
 
   /** the table holding this connection */
   final ConnectionTable owner;
+  
+  /** Set to false once run() is terminating. Using this instead of Thread.isAlive  
+    * as the reader thread may be a pooled thread.
+    */ 
+  private volatile boolean isRunning = false; 
 
   /** true if connection is a shared resource that can be used by more than one thread */
   private boolean sharedResource;
@@ -136,11 +142,14 @@ public class Connection implements Runnable {
   }
 
   private final static ThreadLocal isReaderThread = new ThreadLocal();
-  // return true if this thread is a reader thread
   public final static void makeReaderThread() {
     // mark this thread as a reader thread
-    isReaderThread.set(Boolean.TRUE);
+    makeReaderThread(true);
   }
+  private final static void makeReaderThread(boolean v) {
+    isReaderThread.set(v);
+  }
+  // return true if this thread is a reader thread
   public final static boolean isReaderThread() {
     Object o = isReaderThread.get();
     if (o == null) {
@@ -319,7 +328,7 @@ public class Connection implements Runnable {
   private final Object handshakeSync = new Object();
 
   /** message reader thread */
-  Thread readerThread;
+  private volatile Thread readerThread;
 
 //  /**
 //   * When a thread owns the outLock and is writing to the socket, it must
@@ -523,7 +532,7 @@ public class Connection implements Runnable {
     Connection c = new Connection(t, s);
     boolean readerStarted = false;
     try {
-      c.startReader();
+      c.startReader(t);
       readerStarted = true;
     } finally {
       if (!readerStarted) {
@@ -571,7 +580,7 @@ public class Connection implements Runnable {
       }
       catch (IOException io) {
         logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        SocketCreator.asyncClose(s, this.remoteAddr.toString(), null);
+        t.getSocketCloser().asyncClose(s, this.remoteAddr.toString(), null);
         throw io;
       }
     }
@@ -809,6 +818,8 @@ public class Connection implements Runnable {
     }
   }
   
+  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+  
   /**
    * asynchronously close this connection
    * 
@@ -819,28 +830,31 @@ public class Connection implements Runnable {
     
     // we do the close in a background thread because the operation may hang if 
     // there is a problem with the network.  See bug #46659
-    Runnable r = new Runnable() {
-      public void run() {
-        boolean rShuttingDown = readerShuttingDown;
-        synchronized(stateLock) {
-          if (readerThread != null && readerThread.isAlive() &&
-              !rShuttingDown && connectionState == STATE_READING
-              || connectionState == STATE_READING_ACK) {
-            readerThread.interrupt();
-          }
-        }
-      }
-    };
+
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
     if (beingSick) {
-      r.run();
+      prepareForAsyncClose();
     }
     else {
-      SocketCreator.asyncClose(this.socket, String.valueOf(this.remoteAddr), r);
+      if (this.asyncCloseCalled.compareAndSet(false, true)) {
+        Socket s = this.socket;
+        if (s != null && !s.isClosed()) {
+          prepareForAsyncClose();
+          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+        }
+      }
     }
   }
   
+  private void prepareForAsyncClose() {
+    synchronized(stateLock) {
+      if (readerThread != null && isRunning && !readerShuttingDown
+          && (connectionState == STATE_READING || connectionState == STATE_READING_ACK)) {
+        readerThread.interrupt();
+      }
+    }
+  }
 
   private static final int CONNECT_HANDSHAKE_SIZE = 4096;
 
@@ -951,7 +965,7 @@ public class Connection implements Runnable {
    *
    * @throws IOException if handshake fails
    */
-  private void attemptHandshake() throws IOException {
+  private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
     // send this server's port.  It's expected on the other side
     if (useNIO()) {
@@ -961,7 +975,7 @@ public class Connection implements Runnable {
       handshakeStream();
     }
 
-    startReader(); // this reader only reads the handshake and then exits
+    startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
@@ -1099,7 +1113,7 @@ public class Connection implements Runnable {
         if (conn != null) {
           // handshake
           try {
-            conn.attemptHandshake();
+            conn.attemptHandshake(t);
             if (conn.isSocketClosed()) {
               // something went wrong while reading the handshake
               // and the socket was closed or this guy sent us a
@@ -1601,26 +1615,35 @@ public class Connection implements Runnable {
         this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
       isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
     }
-    if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive()
-        && this.readerThread != Thread.currentThread()) {
-      try {
-        this.readerThread.join(500);
-        if (this.readerThread.isAlive() && !this.readerShuttingDown
-            && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
-          this.readerThread.join(1500);
-          if (this.readerThread.isAlive()) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
+    {
+      // Now that readerThread is returned to a pool after we close
+      // we need to be more careful not to join on a thread that belongs
+      // to someone else.
+      Thread readerThreadSnapshot = this.readerThread;
+      if (!beingSick && readerThreadSnapshot != null && !isIBM
+          && this.isRunning && !this.readerShuttingDown
+          && readerThreadSnapshot != Thread.currentThread()) {
+        try {
+          readerThreadSnapshot.join(500);
+          readerThreadSnapshot = this.readerThread;
+          if (this.isRunning && !this.readerShuttingDown
+              && readerThreadSnapshot != null
+              && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
+            readerThreadSnapshot.join(1500);
+            if (this.isRunning) {
+              logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
+            }
           }
         }
+        catch (IllegalThreadStateException ignore) {
+          // ignored - thread already stopped
+        }
+        catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
+          // but keep going, we're trying to close.
+        }
       }
-      catch (IllegalThreadStateException ignore) {
-        // ignored - thread already stopped
-      }
-      catch (InterruptedException ignore) {
-        Thread.currentThread().interrupt();
-        // but keep going, we're trying to close.
-      }
-    } // !onlyCleanup
+    }
 
     closeBatchBuffer();
     closeAllMsgDestreamers();
@@ -1677,26 +1700,22 @@ public class Connection implements Runnable {
   }
 
   /** starts a reader thread */
-  private void startReader() {
-    ThreadGroup group =
-      LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
-    Assert.assertTrue(this.readerThread == null);
-    this.readerThread =
-      new Thread(group, this, p2pReaderName());
-    this.readerThread.setDaemon(true);
-    stopped = false;
-    this.readerThread.start();
-  }
+  private void startReader(ConnectionTable connTable) { 
+    Assert.assertTrue(!this.isRunning); 
+    stopped = false; 
+    this.isRunning = true; 
+    connTable.executeCommand(this);  
+  } 
 
 
   /** in order to read non-NIO socket-based messages we need to have a thread
       actively trying to grab bytes out of the sockets input queue.
       This is that thread. */
   public void run() {
+    this.readerThread = Thread.currentThread();
+    this.readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    if (this.isReceiver) {
-      makeReaderThread();
-    }
+    makeReaderThread(this.isReceiver);
     try {
       if (useNIO()) {
         runNioReader();
@@ -1725,6 +1744,11 @@ public class Connection implements Runnable {
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
       notifyHandshakeWaiter(false);
+      this.readerThread.setName("unused p2p reader");
+      synchronized (this.stateLock) {
+        this.isRunning = false;
+        this.readerThread = null;
+      }
     } // finally
   }
 
@@ -3307,7 +3331,7 @@ public class Connection implements Runnable {
   protected Object stateLock = new Object();
   
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState;
+  protected byte connectionState = STATE_IDLE;
   
   /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/
   /** the connection is idle, but may be in use */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 9beb947..508eba2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -19,8 +19,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -33,10 +40,12 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
@@ -129,7 +138,15 @@ public class ConnectionTable  {
    */
   private volatile boolean closed = false;
 
-
+  /**
+   * Executor used by p2p reader and p2p handshaker threads.
+   */
+  private final Executor p2pReaderThreadPool;
+  /** Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 minutes). */
+  private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+  
+  private final SocketCloser socketCloser;
+  
   /**
    * The most recent instance to be created
    * 
@@ -202,11 +219,41 @@ public class ConnectionTable  {
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
+    this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
+    this.socketCloser = new SocketCloser();
   /*  NOMUX: if (TCPConduit.useNIO) {
       inputMuxManager = new InputMuxManager(this);
       inputMuxManager.start(c.logger);
     }*/
   }
+  
+  private Executor createThreadPoolForIO(boolean conserveSockets) { 
+    Executor executor = null; 
+    final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
+    if (conserveSockets) { 
+      executor = new Executor() { 
+        @Override 
+        public void execute(Runnable command) { 
+          Thread th = new Thread(connectionRWGroup, command); 
+          th.setDaemon(true); 
+          th.start(); 
+        } 
+      }; 
+    } 
+    else { 
+      BlockingQueue synchronousQueue = new SynchronousQueue(); 
+      ThreadFactory tf = new ThreadFactory() { 
+        public Thread newThread(final Runnable command) { 
+          Thread thread = new Thread(connectionRWGroup, command); 
+          thread.setDaemon(true); 
+          return thread; 
+        } 
+      }; 
+      executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME, 
+          TimeUnit.SECONDS, synchronousQueue, tf); 
+    } 
+    return executor; 
+  } 
 
   /** conduit sends connected() after establishing the server socket */
 //   protected void connected() {
@@ -715,6 +762,14 @@ public class ConnectionTable  {
         this.threadConnMaps.clear();
       }
     }
+    {
+      Executor localExec = this.p2pReaderThreadPool;
+      if (localExec != null) {
+        if (localExec instanceof ExecutorService) {
+          ((ExecutorService)localExec).shutdown();
+        }
+      }
+    }
     closeReceivers(false);
     
     Map m = (Map)this.threadOrderedConnMap.get();
@@ -724,8 +779,16 @@ public class ConnectionTable  {
         m.clear();
       }        
     }
+    this.socketCloser.close();
   }
 
+  public void executeCommand(Runnable runnable) { 
+    Executor local = this.p2pReaderThreadPool;
+    if (local != null) {
+      local.execute(runnable);
+    }
+  }
+  
   /**
    * Close all receiving threads.  This is used during shutdown and is also
    * used by a test hook that makes us deaf to incoming messages.
@@ -800,11 +863,20 @@ public class ConnectionTable  {
     }
 
     if (needsRemoval) {
+      InternalDistributedMember remoteAddress = null;
       synchronized (this.orderedConnectionMap) {
-           closeCon(reason, this.orderedConnectionMap.remove(stub));
+        Object c = this.orderedConnectionMap.remove(stub);
+        if (c instanceof Connection) {
+          remoteAddress = ((Connection) c).getRemoteAddress();
+        }
+        closeCon(reason, c);
       }
       synchronized (this.unorderedConnectionMap) {
-         closeCon(reason, this.unorderedConnectionMap.remove(stub));
+        Object c = this.unorderedConnectionMap.remove(stub);
+        if (remoteAddress == null && (c instanceof Connection)) {
+          remoteAddress = ((Connection) c).getRemoteAddress();
+        }
+        closeCon(reason, c);
       }
 
       {
@@ -813,8 +885,13 @@ public class ConnectionTable  {
           ArrayList al = (ArrayList)cm.remove(stub);
           if (al != null) {
             synchronized (al) {
-              for (Iterator it=al.iterator(); it.hasNext();)
-                closeCon(reason, it.next());
+              for (Iterator it=al.iterator(); it.hasNext();) {
+                Object c = it.next();
+                if (remoteAddress == null && (c instanceof Connection)) {
+                  remoteAddress = ((Connection) c).getRemoteAddress();
+                }
+                closeCon(reason, c);
+              }
               al.clear();
             }
           }
@@ -867,9 +944,17 @@ public class ConnectionTable  {
       if (notifyDisconnect) {
         owner.getMemberForStub(stub, false);
       }
+      
+      if (remoteAddress != null) {
+        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
+      }
     }
   }
   
+  SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   /** check to see if there are still any receiver threads for the given end-point */
   protected boolean hasReceiversFor(Stub endPoint) {
     synchronized (this.receivers) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
new file mode 100644
index 0000000..0b66ec5
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
@@ -0,0 +1,180 @@
+package com.gemstone.gemfire.internal;
+
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.WaitCriterion;
+
+/**
+ * Tests the default SocketCloser.
+ */
+@Category(UnitTest.class)
+public class SocketCloserJUnitTest {
+
+  private SocketCloser socketCloser;
+  
+  @Before
+  public void setUp() throws Exception {
+    this.socketCloser = createSocketCloser();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.socketCloser.close();
+  }
+  
+  private Socket createClosableSocket() {
+    return new Socket();
+  }
+
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser();
+  }
+  
+  /**
+   * Test that close requests are async.
+   */
+  @Test
+  public void testAsync() {
+    final CountDownLatch cdl = new CountDownLatch(1);
+    final AtomicInteger waitingToClose = new AtomicInteger(0);
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          waitingToClose.incrementAndGet();
+          cdl.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    };
+    
+    final int SOCKET_COUNT = 100;
+    final Socket[] aSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      aSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(aSockets[i], "A", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, aSockets[i].isClosed());
+    }
+    final Socket[] bSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      bSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(bSockets[i], "B", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, bSockets[i].isClosed());
+    }
+    // close the socketCloser first to verify that the sockets
+    // that have already been scheduled will be still be closed.
+    this.socketCloser.releaseResourcesForAddress("A");
+    this.socketCloser.close();
+    // Each thread pool (one for A and one for B) has a max of 8 threads.
+    // So verify that this many are currently waiting on cdl.
+    {
+      final int maxThreads = this.socketCloser.getMaxThreads();
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          return waitingToClose.get() == 2*maxThreads;
+        }
+        public String description() {
+          return "expected " + 2*maxThreads + " waiters but found only " + waitingToClose.get();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+    // now count down the latch that allows the sockets to close
+    cdl.countDown();
+    // now all the sockets should get closed; use a wait criteria
+    // since a thread pool is doing to closes
+    {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          for (int i=0; i < SOCKET_COUNT; i++) {
+            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
+              return false;
+            }
+          }
+          return true;
+        }
+        public String description() {
+          return "one or more sockets did not close";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+  }
+  
+  /**
+   * Verify that requesting an asyncClose on an already
+   * closed socket is a noop.
+   */
+  @Test
+  public void testClosedSocket() throws IOException {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    Socket s = createClosableSocket();
+    s.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    DistributedTestCase.pause(10);
+    assertEquals(false, runnableCalled.get());
+  }
+  
+  /**
+   * Verify that a closed SocketCloser will still close an open socket
+   */
+  @Test
+  public void testClosedSocketCloser() {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    final Socket s = createClosableSocket();
+    this.socketCloser.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        return runnableCalled.get() && s.isClosed(); 
+      }
+      public String description() {
+        return "runnable was not called or socket was not closed";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
new file mode 100644
index 0000000..9e63743
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
@@ -0,0 +1,22 @@
+package com.gemstone.gemfire.internal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Tests SocketCloser with a wait time. The default SocketCloser does not wait.
+ * This test configures a closer much like the one used by CacheClientNotifier.
+ */
+@Category(UnitTest.class)
+public class SocketCloserWithWaitJUnitTest extends SocketCloserJUnitTest {
+  @Override
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser(
+        SocketCloser.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS,
+        1, // max threads
+        1, TimeUnit.NANOSECONDS);
+  }
+}


[11/26] incubator-geode git commit: GEODE-320: Close the cache in DistributedTestCase tear down

Posted by bs...@apache.org.
GEODE-320: Close the cache in DistributedTestCase tear down

Close the cache in distributed test case teardown to deal with bad
behaving tests that aren't cleaning up their cache.

Adding a log that shows us what tests ran previously to help debug
in situations where the progress file is not available.

I also found that RegionTestCase was previously not cleaning up the
cache at all, so I added a missing call to super.tearDown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4e65f0c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4e65f0c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4e65f0c7

Branch: refs/heads/feature/GEODE-77
Commit: 4e65f0c72d30ca24c582543a62dc72b064e9d448
Parents: 391a93d
Author: Dan Smith <up...@apache.org>
Authored: Wed Sep 9 13:21:06 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Mon Sep 14 13:54:27 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/cache30/CacheTestCase.java | 27 ++--------
 .../gemfire/cache30/RegionTestCase.java         |  1 +
 .../test/java/dunit/DistributedTestCase.java    | 56 ++++++++++++++++++++
 3 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4e65f0c7/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java
index e318370..951c985 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/CacheTestCase.java
@@ -16,6 +16,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
+import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
@@ -453,29 +454,7 @@ public abstract class CacheTestCase extends DistributedTestCase {
   protected synchronized static void remoteTearDown() {
     try {
       DistributionMessageObserver.setInstance(null);
-      if (cache != null && !cache.isClosed()) {
-        //try to destroy the root regions first so that
-        //we clean up any persistent files.
-        for (Iterator itr = cache.rootRegions().iterator(); itr.hasNext();) {
-          Region root = (Region)itr.next();
-//          String name = root.getName();
-          //for colocated regions you can't locally destroy a partitioned
-          //region.
-	  if(root.isDestroyed() || root instanceof HARegion || root instanceof PartitionedRegion) {
-            continue;
-          }
-          try {
-            root.localDestroyRegion("teardown");
-          }
-          catch (VirtualMachineError e) {
-            SystemFailure.initiateFailure(e);
-            throw e;
-          }
-          catch (Throwable t) {
-            getLogWriter().error(t);
-          }
-        }
-      }
+      destroyRegions(cache);
     }
     finally {
       try {
@@ -497,7 +476,7 @@ public abstract class CacheTestCase extends DistributedTestCase {
       getLogWriter().error("Error cleaning disk dirs", e);
     }
   }
-  
+
   /**
    * Returns a region with the given name and attributes
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4e65f0c7/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/RegionTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/RegionTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/RegionTestCase.java
index 4b20b42..81c8afb 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/RegionTestCase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/RegionTestCase.java
@@ -102,6 +102,7 @@ public abstract class RegionTestCase extends CacheTestCase {
   }
   
   public void tearDown2() throws Exception {
+    super.tearDown2();
     cleanup();
     invokeInEveryVM(getClass(), "cleanup");
     /*for (int h = 0; h < Host.getHostCount(); h++) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4e65f0c7/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/DistributedTestCase.java b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
index 2e7ac03..c78510a 100755
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@ -16,6 +16,7 @@ import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -24,13 +25,17 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.springframework.data.gemfire.support.GemfireCache;
 
 import junit.framework.TestCase;
 
+import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
 import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
 import com.gemstone.gemfire.cache.query.QueryTestUtils;
@@ -54,8 +59,10 @@ import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.admin.ClientStatsManager;
 import com.gemstone.gemfire.internal.cache.DiskStoreObserver;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.HARegion;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -92,6 +99,7 @@ import dunit.standalone.DUnitLauncher;
 public abstract class DistributedTestCase extends TestCase implements java.io.Serializable {
   private static final Logger logger = LogService.getLogger();
   private static final LogWriterLogger oldLogger = LogWriterLogger.create(logger);
+  private static final LinkedHashSet<String> testHistory = new LinkedHashSet<String>();
 
   private static void setUpCreationStackGenerator() {
     // the following is moved from InternalDistributedSystem to fix #51058
@@ -672,6 +680,7 @@ public abstract class DistributedTestCase extends TestCase implements java.io.Se
    */
   @Override
   public void setUp() throws Exception {
+    logTestHistory();
     setUpCreationStackGenerator();
     testName = getName();
     System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
@@ -691,6 +700,16 @@ public abstract class DistributedTestCase extends TestCase implements java.io.Se
     System.out.println("\n\n[setup] START TEST " + getClass().getSimpleName()+"."+testName+"\n\n");
   }
 
+  /**
+   * Write a message to the log about what tests have ran previously. This
+   * makes it easier to figure out if a previous test may have caused problems
+   */
+  private void logTestHistory() {
+    String classname = getClass().getSimpleName();
+    testHistory.add(classname);
+    System.out.println("Previously run tests: " + testHistory);
+  }
+
   public static void perVMSetUp(String name, String defaultDiskStoreName) {
     setTestName(name);
     GemFireCacheImpl.setDefaultDiskStoreName(defaultDiskStoreName);
@@ -767,6 +786,8 @@ public abstract class DistributedTestCase extends TestCase implements java.io.Se
 
 
   private static void cleanupThisVM() {
+    closeCache();
+    
     IpAddress.resolve_dns = true;
     SocketCreator.resolve_dns = true;
     InitialImageOperation.slowImageProcessing = 0;
@@ -794,6 +815,41 @@ public abstract class DistributedTestCase extends TestCase implements java.io.Se
       ex.remove();
     }
   }
+
+  private static void closeCache() {
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if(cache != null && !cache.isClosed()) {
+      destroyRegions(cache);
+      cache.close();
+    }
+  }
+  
+  protected static final void destroyRegions(Cache cache)
+      throws InternalGemFireError, Error, VirtualMachineError {
+    if (cache != null && !cache.isClosed()) {
+      //try to destroy the root regions first so that
+      //we clean up any persistent files.
+      for (Iterator itr = cache.rootRegions().iterator(); itr.hasNext();) {
+        Region root = (Region)itr.next();
+        //for colocated regions you can't locally destroy a partitioned
+        //region.
+        if(root.isDestroyed() || root instanceof HARegion || root instanceof PartitionedRegion) {
+          continue;
+        }
+        try {
+          root.localDestroyRegion("teardown");
+        }
+        catch (VirtualMachineError e) {
+          SystemFailure.initiateFailure(e);
+          throw e;
+        }
+        catch (Throwable t) {
+          getLogWriter().error(t);
+        }
+      }
+    }
+  }
+  
   
   public static void unregisterAllDataSerializersFromAllVms()
   {


[15/26] incubator-geode git commit: GEODE-171: Disabling ClientServerTimeSyncDUnitTest

Posted by bs...@apache.org.
GEODE-171: Disabling ClientServerTimeSyncDUnitTest

After discussion with Bruce, we may not even have this time sync
protocol after GEODE-77 merges. There's no point in continuing to see
failures from this broken test on develop in the mean time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7bc01129
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7bc01129
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7bc01129

Branch: refs/heads/feature/GEODE-77
Commit: 7bc011293f311a77b44beac13c6d5c91a01abe3d
Parents: 46ec8ae
Author: Dan Smith <up...@apache.org>
Authored: Thu Sep 17 11:56:38 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Thu Sep 17 11:57:55 2015 -0700

----------------------------------------------------------------------
 .../gemfire/cache/ClientServerTimeSyncDUnitTest.java        | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7bc01129/gemfire-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
index d980bc8..492d9a4 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
@@ -24,7 +24,7 @@ public class ClientServerTimeSyncDUnitTest extends CacheTestCase {
     super(name);
   }
 
-  @Ignore("BUg 52327")
+  @Ignore("Bug 52327")
   public void DISABLED_testClientTimeAdvances() {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0); // Server
@@ -96,8 +96,13 @@ public class ClientServerTimeSyncDUnitTest extends CacheTestCase {
       vm1.invoke(CacheTestCase.class, "disconnectFromDS");
     }
   }
+  
+  public void testNothing() {
+    // place-holder to keep dunit runner from barfing
+  }
 
-  public void testClientTimeSlowsDown() {
+  @Ignore("Failing inconsistently")
+  public void DISABLED_testClientTimeSlowsDown() {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0); // Server
     VM vm1 = host.getVM(1); // Client


[02/26] incubator-geode git commit: GEODE-295 - Expecting a suspect string in BridgeMembershipDUnitTet

Posted by bs...@apache.org.
GEODE-295 - Expecting a suspect string in BridgeMembershipDUnitTet

testBridgeMembershipEventsInClient already had some code to expect the
suspect string, but it was writing to a weirdly constructed logger, so I
think it was not actually taking effect.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/141512c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/141512c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/141512c5

Branch: refs/heads/feature/GEODE-77
Commit: 141512c5cf5e8f1896e05bff064d6d11661e29eb
Parents: 0c13b4d
Author: Dan Smith <up...@apache.org>
Authored: Tue Sep 1 09:07:16 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Tue Sep 1 09:07:16 2015 -0700

----------------------------------------------------------------------
 .../cache30/BridgeMembershipDUnitTest.java      | 38 ++++++--------------
 1 file changed, 10 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/141512c5/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java
index 7f6af30..ca7a439 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/BridgeMembershipDUnitTest.java
@@ -719,6 +719,7 @@ public class BridgeMembershipDUnitTest extends BridgeTestCase {
    * crashes or departs gracefully, the client will detect this as a crash.
    */
   public void testBridgeMembershipEventsInClient() throws Exception {
+    addExpectedException("IOException");
     final boolean[] fired = new boolean[3];
     final DistributedMember[] member = new DistributedMember[3];
     final String[] memberId = new String[3];
@@ -876,35 +877,16 @@ public class BridgeMembershipDUnitTest extends BridgeTestCase {
     assertFalse(isClient[CRASHED]);
     resetArraysForTesting(fired, member, memberId, isClient);
 
-//    String expected = "dead server list" +
-//                      "||live server list" +
-//                      "||java.io.IOException";
-    String expected = "java.io.IOException";
-    String addExpected = 
-      "<ExpectedException action=add>" + expected + "</ExpectedException>";
-    String removeExpected = 
-      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-      
-    LogWriter bgexecLogger =
-          new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
-    bgexecLogger.info(addExpected);
-    getLogWriter().info(addExpected);
-    try {
-      vm0.invoke(new SerializableRunnable("Stop BridgeServer") {
-        public void run() {
-          getLogWriter().info("[testBridgeMembershipEventsInClient] Stop BridgeServer");
-          stopBridgeServers(getCache());
-        }
-      });
-      synchronized(listener) {
-        if (!fired[JOINED] && !fired[CRASHED]) {
-          listener.wait(60 * 1000);
-        }
+    vm0.invoke(new SerializableRunnable("Stop BridgeServer") {
+      public void run() {
+        getLogWriter().info("[testBridgeMembershipEventsInClient] Stop BridgeServer");
+        stopBridgeServers(getCache());
+      }
+    });
+    synchronized(listener) {
+      if (!fired[JOINED] && !fired[CRASHED]) {
+        listener.wait(60 * 1000);
       }
-    }
-    finally {
-      bgexecLogger.info(removeExpected);
-      getLogWriter().info(removeExpected);
     }
     
     getLogWriter().info("[testBridgeMembershipEventsInClient] assert client detected server departure");


[25/26] incubator-geode git commit: GEODE-363: Disable unreliable test

Posted by bs...@apache.org.
GEODE-363: Disable unreliable test


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e1c2d8e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e1c2d8e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e1c2d8e7

Branch: refs/heads/feature/GEODE-77
Commit: e1c2d8e7add06b7e8ce6995adc6c1c416b251be6
Parents: 8547695
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 25 11:45:54 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 25 11:45:54 2015 -0700

----------------------------------------------------------------------
 .../gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e1c2d8e7/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
index 5fb2949..3330574 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
@@ -367,7 +367,7 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase {
    * most of the time the data is not found in memory and queue and 
    * is fetched from HDFS
    */
-  public void testPutAllAndGetFromHDFS() {
+  public void _testPutAllAndGetFromHDFS() {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);


[03/26] incubator-geode git commit: GEODE-306: support printing of off-heap compressed nodes

Posted by bs...@apache.org.
GEODE-306: support printing of off-heap compressed nodes

Before when a chunk of off-heap compressed memory tried to
get the type of the data stored in the chunk it needed to
decompress the data. Since it did not know what decompressor
to use it threw an UnsupportedOperationException.
Now for a compressed chunk it returns the DataType as either
a "compressed object of size xxx" or "compressed byte[xxx]".
It does not do a decompression.
Unit test coverage was added.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f0f61766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f0f61766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f0f61766

Branch: refs/heads/feature/GEODE-77
Commit: f0f61766d0fa9737369b6910d94d1668e011299b
Parents: 141512c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 1 16:40:43 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Sep 2 09:41:06 2015 -0700

----------------------------------------------------------------------
 .../offheap/SimpleMemoryAllocatorImpl.java      |  8 +++-
 .../offheap/OffHeapValidationJUnitTest.java     | 45 +++++++++++++++-----
 2 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0f61766/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index 29319e0..7cf1656 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@ -3341,7 +3341,13 @@ public final class SimpleMemoryAllocatorImpl implements MemoryAllocator, MemoryI
       }
       if (!isSerialized()) {
         // byte array
-        return "byte[" + ((Chunk)this.block).getDataSize() + "]";
+        if (isCompressed()) {
+          return "compressed byte[" + ((Chunk)this.block).getDataSize() + "]";
+        } else {
+          return "byte[" + ((Chunk)this.block).getDataSize() + "]";
+        }
+      } else if (isCompressed()) {
+        return "compressed object of size " + ((Chunk)this.block).getDataSize();
       }
       //Object obj = EntryEventImpl.deserialize(((Chunk)this.block).getRawBytes());
       byte[] bytes = ((Chunk)this.block).getRawBytes();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0f61766/gemfire-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
index 7991872..1b40f2a 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/offheap/OffHeapValidationJUnitTest.java
@@ -5,10 +5,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -33,9 +36,13 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.CacheFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.compression.SnappyCompressor;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
@@ -87,7 +94,7 @@ public class OffHeapValidationJUnitTest {
   }
   
   @Test
-  public void testMemoryInspection() {
+  public void testMemoryInspection() throws IOException {
     // validate initial state
     MemoryAllocator allocator = this.cache.getOffHeapStore();
     assertNotNull(allocator);
@@ -113,6 +120,7 @@ public class OffHeapValidationJUnitTest {
     
     // create off-heap region
     Region<Object, Object> region = this.cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).create(getRegionName());
+    Region<Object, Object> compressedRegion = this.cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).setCompressor(SnappyCompressor.getDefaultInstance()).create(getRegionName()+"Compressed");
     
     // perform some ops
     List<ExpectedValues> expected = new ArrayList<ExpectedValues>();
@@ -120,8 +128,10 @@ public class OffHeapValidationJUnitTest {
     // Chunk.OFF_HEAP_HEADER_SIZE + 4 ?
     
     putString(region, expected);
+    putCompressedString(compressedRegion, expected);
     putDate(region, expected);
     putByteArray(region, expected);
+    putCompressedByteArray(compressedRegion, expected);
     putByteArrayArray(region, expected);
     putShortArray(region, expected);
     putStringArray(region, expected);
@@ -189,8 +199,11 @@ public class OffHeapValidationJUnitTest {
         assertTrue(obj instanceof String);
         assertEquals("this is a string", (String)obj);
       }
-      if (values.dataType.contains("[")) { //for (int j = 0; j < ((byte[])values.dataValue).length; j++) {
-        // TODO
+      if ((values.dataType.contains("byte [") && values.dataType.lastIndexOf('[') == values.dataType.indexOf('[')) || values.dataType.startsWith("compressed")) {
+        assertTrue("for dataType=" + values.dataType + " expected " + Arrays.toString((byte[])values.dataValue) + " but was " + Arrays.toString((byte[])block.getDataValue()),
+            Arrays.equals((byte[])values.dataValue, (byte[])block.getDataValue()));
+      } else if (values.dataType.contains("[")) {
+        // TODO: multiple dimension arrays or non-byte arrays
       } else if (values.dataValue instanceof Collection) {
         int diff = joint((Collection<?>)values.dataValue, (Collection<?>)block.getDataValue());
         assertEquals(i + ":" + values.dataType, 0, diff);
@@ -216,14 +229,6 @@ public class OffHeapValidationJUnitTest {
     
   }
   
-  @Test
-  public void testCompaction() {
-    // create fragmented state
-    // validate fragmented
-    // perform compaction
-    // validate freed fragments
-  }
-  
   /**
    * Returns -1 if c1 is missing an element in c2, 1 if c2 is missing an element
    * in c1, or 0 is they contain the exact same elements.
@@ -289,6 +294,17 @@ public class OffHeapValidationJUnitTest {
     expected.add(new ExpectedValues(value, value.length()*2, "java.lang.String", -1, getMemoryAddress(region, key), 1, 0, false, true));
   }
   
+  private void putCompressedString(Region<Object, Object> region, List<ExpectedValues> expected) throws IOException {
+    String key = "keyString";
+    String value = "this is a string";
+    region.put(key, value);
+    HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+    DataSerializer.writeObject(value, hdos);
+    byte[] uncompressedBytes = hdos.toByteArray();
+    byte[] expectedValue = SnappyCompressor.getDefaultInstance().compress(uncompressedBytes);
+    expected.add(new ExpectedValues(expectedValue, 32, "compressed object of size " + expectedValue.length, -1, getMemoryAddress(region, key), 1, 0, true, true));
+  }
+
   private void putDate(Region<Object, Object> region, List<ExpectedValues> expected) {
     String key = "keyDate";
     Date value = new Date();
@@ -302,6 +318,13 @@ public class OffHeapValidationJUnitTest {
     region.put(key, value);
     expected.add(new ExpectedValues(value, 24, "byte[10]", -1, getMemoryAddress(region, key), 1, 0, false, false));
   }
+  private void putCompressedByteArray(Region<Object, Object> region, List<ExpectedValues> expected) throws IOException {
+    String key = "keyByteArray";
+    byte[] value = new byte[10];
+    region.put(key, value);
+    byte[] expectedValue = SnappyCompressor.getDefaultInstance().compress(value);
+    expected.add(new ExpectedValues(expectedValue, 24, "compressed byte[" + expectedValue.length + "]", -1, getMemoryAddress(region, key), 1, 0, true, false));
+  }
   
   private void putByteArrayArray(Region<Object, Object> region, List<ExpectedValues> expected) {
     String key = "keyByteArrayArray";


[13/26] incubator-geode git commit: GEODE-334: A NullPointerException can sometimes be thrown from CacheServerBridge getUniqueClientIds

Posted by bs...@apache.org.
GEODE-334: A NullPointerException can sometimes be thrown from CacheServerBridge getUniqueClientIds

Incorporated changes from GemFire


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6fe66f32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6fe66f32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6fe66f32

Branch: refs/heads/feature/GEODE-77
Commit: 6fe66f32c008d8d99a5e94eee5fa979296f1ad5f
Parents: 2c99b9e
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Sep 16 16:43:55 2015 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Sep 16 16:43:55 2015 -0700

----------------------------------------------------------------------
 .../gemfire/management/internal/beans/CacheServerBridge.java       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6fe66f32/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java
index 65e2c5a..012b8ec 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java
@@ -350,11 +350,13 @@ public class CacheServerBridge extends ServerBridge{
       }
       for (ServerConnection conn : serverConnections) {
         ClientProxyMembershipID clientId = conn.getProxyID();
+        if (clientId != null) { // Check added to fix bug 51987
         if (uniqueIds.get(clientId.getDSMembership()) == null) {
           ClientConnInfo clientConInfo = new ClientConnInfo(conn.getProxyID(), conn.getSocketHost(),
               conn.getSocketPort(), false);
           uniqueIds.put(clientId.getDSMembership(), clientConInfo);
         }
+        }
       }
     }
 


[22/26] incubator-geode git commit: GEODE-365: HotSpot SIGSEGV attempting to compile javax.print.attribute.EnumSyntax::readResolve with JDK 1.8.0_45 Incorporated changes from GemFire

Posted by bs...@apache.org.
GEODE-365: HotSpot SIGSEGV attempting to compile javax.print.attribute.EnumSyntax::readResolve with JDK 1.8.0_45
Incorporated changes from GemFire


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8639b864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8639b864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8639b864

Branch: refs/heads/feature/GEODE-77
Commit: 8639b864b2d04829abdc8b6f3e18273faecdcdfa
Parents: 3d648db
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Sep 23 14:13:18 2015 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Sep 23 14:13:18 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/cache/EvictionAction.java  |  3 +-
 .../gemfire/cache/EvictionAlgorithm.java        |  3 +-
 .../gemfire/cache/Bug52289JUnitTest.java        | 81 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8639b864/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAction.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAction.java
index c4bb350..ada568f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAction.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAction.java
@@ -49,7 +49,8 @@ public final class EvictionAction extends EnumSyntax
     return stringTable;
   }
     
-  private static final EvictionAction[] enumValueTable = {
+  //TODO post Java 1.8.0u45 uncomment final flag, see JDK-8076152
+  private static /*final*/ EvictionAction[] enumValueTable = {
     NONE,
     LOCAL_DESTROY,
     OVERFLOW_TO_DISK

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8639b864/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAlgorithm.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAlgorithm.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAlgorithm.java
index 33cca0e..248bd17 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAlgorithm.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/EvictionAlgorithm.java
@@ -72,7 +72,8 @@ public final class EvictionAlgorithm extends EnumSyntax
     return stringTable;
   }
     
-  private static final EvictionAlgorithm[] enumValueTable = {
+  //TODO post Java 1.8.0u45 uncomment final flag, see JDK-8076152
+  private static /*final*/ EvictionAlgorithm[] enumValueTable = {
     NONE,
     LRU_ENTRY,
     LRU_HEAP,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8639b864/gemfire-core/src/test/java/com/gemstone/gemfire/cache/Bug52289JUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/Bug52289JUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/Bug52289JUnitTest.java
new file mode 100644
index 0000000..9efdb02
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/Bug52289JUnitTest.java
@@ -0,0 +1,81 @@
+/*=========================================================================
+ * Copyright (c) 2002-2015 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Test case for Trac <a
+ * href="https://svn.gemstone.com/trac/gemfire/ticket/52289">#52289</a>.
+ * 
+ * Asserts fixes for bug JDK-8076152 in JDK 1.8.0u20 to 1.8.0.u45.
+ * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8076152
+ * 
+ * The JVM crashes when hotspot compiling a method that uses an array consisting
+ * of objects of a base class when different child classes is used as actual
+ * instance objects AND when the array is constant (declared final). The crash
+ * occurs during process of the aaload byte code.
+ * 
+ * This test and its corrections can be removed after the release of JDK
+ * 1.8.0u60 if we choose to not support 1.8.0u20 - 1.8.0u45 inclusive.
+ * 
+ * @author jbarrett@pivotal.io
+ *
+ * @since 8.2
+ * 
+ */
+@Category(UnitTest.class)
+public class Bug52289JUnitTest {
+
+  @Test
+  public void test() throws IOException, ClassNotFoundException {
+    // Iterate enough to cause JIT to compile
+    // javax.print.attribute.EnumSyntax::readResolve
+    for (int i = 0; i < 100_000; i++) {
+      // Must execute two or more subclasses with final static arrays of
+      // different types.
+      doEvictionAlgorithm();
+      doEvictionAction();
+    }
+  }
+
+  protected void doEvictionAlgorithm() throws IOException, ClassNotFoundException {
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(EvictionAlgorithm.NONE);
+    oos.close();
+
+    final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    final ObjectInputStream ois = new ObjectInputStream(bais);
+    ois.readObject();
+    ois.close();
+  }
+
+  protected void doEvictionAction() throws IOException, ClassNotFoundException {
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(EvictionAction.NONE);
+    oos.close();
+
+    final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    final ObjectInputStream ois = new ObjectInputStream(bais);
+    ois.readObject();
+    ois.close();
+  }
+
+}
\ No newline at end of file


[18/26] incubator-geode git commit: GEODE-343 Fix race in ConcurrentMapOpsDUnitTest.

Posted by bs...@apache.org.
GEODE-343 Fix race in ConcurrentMapOpsDUnitTest.

Added a new listener to clients that will keep track of the initial creates,
then wait for this listener to get all initial creates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8a5920d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8a5920d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8a5920d5

Branch: refs/heads/feature/GEODE-77
Commit: 8a5920d51346b661e253bbb33d392f92b83184eb
Parents: 71e8dc8
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Wed Sep 23 07:30:45 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Wed Sep 23 09:29:21 2015 -0700

----------------------------------------------------------------------
 .../cache/ConcurrentMapOpsDUnitTest.java        | 69 ++++++++++++++++++--
 1 file changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a5920d5/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java
index 72db3f1..a34f6d2 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConcurrentMapOpsDUnitTest.java
@@ -16,6 +16,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.AssertionFailedError;
 
@@ -186,6 +187,17 @@ public class ConcurrentMapOpsDUnitTest extends CacheTestCase {
       fail("should not be called.  Event="+event);
     }
   }
+
+  static class InitialCreatesListener extends AbstractConcMapOpsListener {
+    AtomicInteger numCreates = new AtomicInteger();
+    @Override
+    void validate(EntryEvent event) {
+      if (!event.getOperation().isCreate()) {
+        fail("expected only create events");
+      }
+      numCreates.incrementAndGet();
+    }
+  }
   /**
    * @param name
    */
@@ -205,18 +217,67 @@ public class ConcurrentMapOpsDUnitTest extends CacheTestCase {
     createClientRegionWithRI(client1, port1, true);
     createClientRegionWithRI(client2, port2, true);
 
-    
+    SerializableCallable addListenerToClientForInitialCreates = new SerializableCallable() {
+      public Object call() throws Exception {
+        Region r = getCache().getRegion(REP_REG_NAME);
+        r.getAttributesMutator().addCacheListener(new InitialCreatesListener());
+        Region pr = getCache().getRegion(PR_REG_NAME);
+        pr.getAttributesMutator().addCacheListener(new InitialCreatesListener());
+        return null;
+      }
+    };
+    client1.invoke(addListenerToClientForInitialCreates);
+    client2.invoke(addListenerToClientForInitialCreates);
+
     vm1.invoke(new SerializableCallable() {
       public Object call() throws Exception {
         Region<Integer, String> r = getGemfireCache().getRegion(REP_REG_NAME);
         Region<Integer, String> pr = getGemfireCache().getRegion(PR_REG_NAME);
-        for (int i=0; i<MAX_ENTRIES; i++) {
-          r.put(i, "value"+i);
-          pr.put(i, "value"+i);
+        for (int i = 0; i < MAX_ENTRIES; i++) {
+          r.put(i, "value" + i);
+          pr.put(i, "value" + i);
         }
         return null;
       }
     });
+
+    SerializableCallable waitForInitialCreates = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region<Integer, String> r = getGemfireCache().getRegion(REP_REG_NAME);
+        Region<Integer, String> pr = getGemfireCache().getRegion(PR_REG_NAME);
+        waitForCreates(r);
+        waitForCreates(pr);
+        return null;
+      }
+      private void waitForCreates(Region region) {
+        CacheListener[] listeners = region.getAttributes().getCacheListeners();
+        boolean listenerFound = false;
+        for (CacheListener listener : listeners) {
+          if (listener instanceof InitialCreatesListener) {
+            listenerFound = true;
+            final InitialCreatesListener initialCreatesListener = (InitialCreatesListener) listener;
+            WaitCriterion wc = new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return initialCreatesListener.numCreates.get() == MAX_ENTRIES;
+              }
+              @Override
+              public String description() {
+                return "Client expected to get "+MAX_ENTRIES+" creates, but got "+initialCreatesListener.numCreates.get();
+              }
+            };
+            DistributedTestCase.waitForCriterion(wc, 30*1000, 500, true);
+          }
+        }
+        if (!listenerFound) {
+          fail("Client listener should have been found");
+        }
+      }
+    };
+    client1.invoke(waitForInitialCreates);
+    client2.invoke(waitForInitialCreates);
+
     SerializableCallable addListener = new SerializableCallable() {
       public Object call() throws Exception {
         Region r = getCache().getRegion(REP_REG_NAME);


[24/26] incubator-geode git commit: Revert "Change FindBugs reportLevel to 'low' so all bugs are reported"

Posted by bs...@apache.org.
Revert "Change FindBugs reportLevel to 'low' so all bugs are reported"

This reverts commit 6444722fbb92a1a9c8a2fbc65cd25d07d0ba300e. Mistakenly
checked into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/85476953
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/85476953
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/85476953

Branch: refs/heads/feature/GEODE-77
Commit: 85476953a884659229b19c65ea1e1a24f4067232
Parents: 6444722
Author: Mark Bretl <mb...@pivotal.io>
Authored: Thu Sep 24 15:38:27 2015 -0700
Committer: Mark Bretl <mb...@pivotal.io>
Committed: Thu Sep 24 15:38:27 2015 -0700

----------------------------------------------------------------------
 build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85476953/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index eda7642..d7b4965 100755
--- a/build.gradle
+++ b/build.gradle
@@ -160,7 +160,7 @@ subprojects {
         dep.transitive = true
       }
       findbugs.effort = 'max'
-      findbugs.reportLevel = 'low'
+      findbugs.reportLevel = 'high'
     }
  
     tasks.withType(FindBugs) {


[12/26] incubator-geode git commit: GEODE-333: Indexes sometimes are no longer used after a rebalance

Posted by bs...@apache.org.
GEODE-333: Indexes sometimes are no longer used after a rebalance

Incorporated changes from GemFire


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2c99b9e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2c99b9e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2c99b9e6

Branch: refs/heads/feature/GEODE-77
Commit: 2c99b9e6dbbae1f7fa169f43b3708328cb4d0ebf
Parents: 4e65f0c
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Sep 16 14:35:37 2015 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Sep 16 14:47:06 2015 -0700

----------------------------------------------------------------------
 .../index/FunctionalIndexCreationHelper.java    |  1 -
 .../query/internal/index/PartitionedIndex.java  | 16 +++++-
 .../query/internal/index/IndexUseJUnitTest.java | 52 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2c99b9e6/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/FunctionalIndexCreationHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/FunctionalIndexCreationHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/FunctionalIndexCreationHelper.java
index a2b48c3..e3f5209 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/FunctionalIndexCreationHelper.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/FunctionalIndexCreationHelper.java
@@ -581,7 +581,6 @@ class FunctionalIndexCreationHelper extends IndexCreationHelper {
       }
     }
     catch (Exception e) {
-      e.printStackTrace();
       throw new IndexInvalidException(e);
     }
     return retValues;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2c99b9e6/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/PartitionedIndex.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/PartitionedIndex.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/PartitionedIndex.java
index b89a738..d41f706 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/PartitionedIndex.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/PartitionedIndex.java
@@ -121,6 +121,9 @@ public class PartitionedIndex extends AbstractIndex
       List<Index> indexes = this.bucketIndexes.get(r);
       if(indexes != null) {
         indexes.remove(index);
+        if (indexes.isEmpty()) {
+          this.bucketIndexes.remove(r);
+        }
       }
     }
   }
@@ -187,6 +190,17 @@ public class PartitionedIndex extends AbstractIndex
     return index;
   }
   
+  protected Map.Entry<Region,List<Index>> getFirstBucketIndex()
+  {
+    Map.Entry<Region,List<Index>> firstIndexEntry = null;
+    synchronized(this.bucketIndexes) {
+      if (this.bucketIndexes.size() > 0) {
+        firstIndexEntry = this.bucketIndexes.entrySet().iterator().next();
+      }
+    }
+    return firstIndexEntry;
+  }
+
   /**
    * Returns the type of index this partitioned index represents.
    * @return  indexType type of partitioned index.
@@ -242,7 +256,7 @@ public class PartitionedIndex extends AbstractIndex
         throw new QueryInvocationTargetException("Bucket not found for the id :" + bId);
       }
       IndexManager im = IndexUtils.getIndexManager(bukRegion, true); 
-      if (im.getIndex(indexName) == null) { 
+      if (im != null && im.getIndex(indexName) == null) { 
         try {
           if (pr.getCache().getLogger().fineEnabled()) {
             pr.getCache().getLogger().fine("Verifying index presence on bucket region. " +

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2c99b9e6/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/IndexUseJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/IndexUseJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/IndexUseJUnitTest.java
index 81bec89..8cb9ded 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/IndexUseJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/IndexUseJUnitTest.java
@@ -29,6 +29,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.query.CacheUtils;
 import com.gemstone.gemfire.cache.query.Index;
@@ -1740,6 +1742,51 @@ public class IndexUseJUnitTest
       }
     }
 
+    @Test
+    public void testBug52444() throws Exception {
+        // Create partitioned region
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        AttributesFactory af = new AttributesFactory();
+        af.setPartitionAttributes(paf.create());
+        Region region = CacheUtils.createRegion("testBug52444", af.create(), false);
+
+        // Add index
+        PartitionedIndex index = (PartitionedIndex) qs.createIndex("statusIndex", "status", region.getFullPath());
+
+        // Do puts
+        for (int i=0; i<200; i++) {
+          region.put(i, new Portfolio(i));
+        }
+        
+        // Initialize query observer
+        QueryObserverImpl observer = new QueryObserverImpl();
+        QueryObserverHolder.setInstance(observer);
+        
+        // Create and run query
+        Query query = qs.newQuery("SELECT * FROM " + region.getFullPath() + " where status = 'active'");
+        query.execute();
+
+        // Verify index was used
+        assertTrue(observer.isIndexesUsed);
+
+        // Get the first index entry in the PartitionedIndex bucketIndexes and delete the index from it (to simulate what happens when a bucket is moved)
+        Map.Entry<Region,List<Index>> firstIndexEntry = index.getFirstBucketIndex();
+        assertTrue(!firstIndexEntry.getValue().isEmpty());
+        index.removeFromBucketIndexes(firstIndexEntry.getKey(), firstIndexEntry.getValue().iterator().next());
+        
+        // Verify the index was removed from the entry and the entry was removed from the bucket indexes
+        assertTrue(firstIndexEntry.getValue().isEmpty());      
+        Map.Entry<Region,List<Index>> nextFirstIndexEntry = index.getFirstBucketIndex();
+        assertTrue(!nextFirstIndexEntry.getValue().isEmpty());
+        
+        // Run query again
+        observer.reset();
+        query.execute();
+        
+        // Verify index was still used
+        assertTrue(observer.isIndexesUsed);
+      }
+
   class QueryObserverImpl extends QueryObserverAdapter
   {
     boolean isIndexesUsed = false;
@@ -1755,6 +1802,11 @@ public class IndexUseJUnitTest
         isIndexesUsed = true;
       }
     }
+    
+    public void reset() {
+      this.isIndexesUsed = false;
+      this.indexesUsed.clear();
+    }
   }
   
   public class RangeIndexTestHook implements TestHook {


[19/26] incubator-geode git commit: GEODE-354 Do not deserialize values for displaying/throwing CommitConflictException.

Posted by bs...@apache.org.
GEODE-354 Do not deserialize values for displaying/throwing CommitConflictException.

By default do not log/throw CommitConflictException with string representation of values.
Adding a System property gemfire.verboseConflictString, which along with DEBUG level logs,
will convert values to string on CommitConflictException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/eb896614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/eb896614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/eb896614

Branch: refs/heads/feature/GEODE-77
Commit: eb896614ee1a754ecf46500d471c517dbd0282b2
Parents: 8a5920d
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Tue Sep 22 07:30:30 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Wed Sep 23 09:37:52 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXEntryState.java     | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eb896614/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
index 0d44686..51e4755 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
@@ -236,6 +236,11 @@ public class TXEntryState implements Releasable
   private transient DistTxThinEntryState distTxThinEntryState;
   
   /**
+   * Use this system property if you need to display/log string values in conflict messages
+   */
+  private static final boolean VERBOSE_CONFLICT_STRING = Boolean.getBoolean("gemfire.verboseConflictString");
+
+  /**
    * This constructor is used to create a singleton used by LocalRegion to
    * signal that noop invalidate op has been performed. The instance returned by
    * this constructor is just a marker; it is not good for anything else.
@@ -1511,14 +1516,14 @@ public class TXEntryState implements Releasable
         //           curCmtVersionId =
         // ((CachedDeserializable)curCmtVersionId).getDeserializedValue();
         //         }
-        String fromString = calcConflictString(getOriginalVersionId());
-        String toString = calcConflictString(curCmtVersionId);
-        if (fromString.equals(toString)) {
-          throw new CommitConflictException(LocalizedStrings.TXEntryState_ENTRY_FOR_KEY_0_ON_REGION_1_HAD_A_STATE_CHANGE.toLocalizedString(new Object[] {key, r.getDisplayName()}));
-        }
-        else {
-          throw new CommitConflictException(LocalizedStrings.TXEntryState_ENTRY_FOR_KEY_0_ON_REGION_1_HAD_ALREADY_BEEN_CHANGED_FROM_2_TO_3.toLocalizedString(new Object[] {key, r.getDisplayName(), fromString, toString}));
+        if (VERBOSE_CONFLICT_STRING || logger.isDebugEnabled()) {
+          String fromString = calcConflictString(getOriginalVersionId());
+          String toString = calcConflictString(curCmtVersionId);
+          if (!fromString.equals(toString)) {
+            throw new CommitConflictException(LocalizedStrings.TXEntryState_ENTRY_FOR_KEY_0_ON_REGION_1_HAD_ALREADY_BEEN_CHANGED_FROM_2_TO_3.toLocalizedString(new Object[] {key, r.getDisplayName(), fromString, toString}));
+          }
         }
+        throw new CommitConflictException(LocalizedStrings.TXEntryState_ENTRY_FOR_KEY_0_ON_REGION_1_HAD_A_STATE_CHANGE.toLocalizedString(new Object[]{key, r.getDisplayName()}));
       }
       } finally {
         OffHeapHelper.release(curCmtVersionId);


[10/26] incubator-geode git commit: Merge branch 'feature/GEODE-313' into develop This closes #19

Posted by bs...@apache.org.
Merge branch 'feature/GEODE-313' into develop
This closes #19


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/391a93d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/391a93d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/391a93d5

Branch: refs/heads/feature/GEODE-77
Commit: 391a93d591c02c2b122f878c8e34fc4de10a0f40
Parents: 1517f88 c1de3fe
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Thu Sep 10 12:06:59 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Thu Sep 10 12:07:55 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/redis/RegionProvider.java  | 44 ++++++++++----------
 1 file changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------



[08/26] incubator-geode git commit: GEODE-311: make pdx logging info level

Posted by bs...@apache.org.
GEODE-311: make pdx logging info level


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/16571a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/16571a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/16571a6f

Branch: refs/heads/feature/GEODE-77
Commit: 16571a6f01ba75fa6b8ebe1d80af5a555161859e
Parents: f770621
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Sep 4 13:47:42 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Sep 9 13:52:42 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/pdx/internal/TypeRegistry.java  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16571a6f/gemfire-core/src/main/java/com/gemstone/gemfire/pdx/internal/TypeRegistry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/pdx/internal/TypeRegistry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/pdx/internal/TypeRegistry.java
index 4ca1a90..c0bb19c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/pdx/internal/TypeRegistry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/pdx/internal/TypeRegistry.java
@@ -122,6 +122,9 @@ public class TypeRegistry {
       if(pdxType != null) {
         this.idToType.put(typeId, pdxType);
         this.typeToId.put(pdxType, typeId);
+        if (logger.isInfoEnabled()) {
+          logger.info("Adding: {}", pdxType.toFormattedString());
+        }
         if (logger.isDebugEnabled()) {
           logger.debug("Adding entry into pdx type registry, typeId: {}  {}", typeId, pdxType);
         }
@@ -186,8 +189,8 @@ public class TypeRegistry {
     if(oldType == null) {
       this.idToType.put(id, newType);
       this.typeToId.put(newType, id);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Defining: {}", newType.toFormattedString());
+      if (logger.isInfoEnabled()) {
+        logger.info("Defining: {}", newType.toFormattedString());
       }
     } else {
       //TODO - this might be overkill, but type definition should be rare enough.
@@ -205,8 +208,8 @@ public class TypeRegistry {
       this.distributedTypeRegistry.addRemoteType(typeId, newType);
       this.idToType.put(typeId, newType);
       this.typeToId.put(newType, typeId);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Adding, from remote WAN: {}", newType.toFormattedString());
+      if (logger.isInfoEnabled()) {
+        logger.info("Adding, from remote WAN: {}", newType.toFormattedString());
       }
     } else {
     //TODO - this might be overkill, but type definition should be rare enough.
@@ -483,8 +486,8 @@ public class TypeRegistry {
     this.distributedTypeRegistry.addImportedType(typeId, importedType);
     this.idToType.put(typeId, importedType);
     this.typeToId.put(importedType, typeId);
-    if (logger.isDebugEnabled()) {
-      logger.debug("Importing type: {}", importedType.toFormattedString());
+    if (logger.isInfoEnabled()) {
+      logger.info("Importing type: {}", importedType.toFormattedString());
     }
   }