You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/14 22:30:11 UTC

[01/18] incubator-geode git commit: GEODE-1178 Unexpected DistributedSystemDisconnectedException caused by RejectedExecutionException

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1162 d9344e0a7 -> e2813de40


GEODE-1178 Unexpected DistributedSystemDisconnectedException caused by RejectedExecutionException

This has been reported to JGroups.  While they're deciding what to do about
it I have coded a workaround in our StatRecorder class.  StatRecorder sits
in the JGroups stack just above the transport protocol that is throwing this
exception from its down() method.  StatRecorder will now catch the exception
and, after sleeping a short amount of time (10ms) it will retry as long as
the Manager is not shutting down.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/39e94bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/39e94bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/39e94bc8

Branch: refs/heads/feature/GEODE-1162
Commit: 39e94bc8beb22b62ed727640bbad3511affc9923
Parents: 86ab7cd
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Apr 12 10:43:26 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Apr 12 10:45:40 2016 -0700

----------------------------------------------------------------------
 .../internal/membership/gms/Services.java       | 23 +++++--
 .../membership/gms/fd/GMSHealthMonitor.java     |  1 -
 .../gms/messenger/JGroupsMessenger.java         |  2 +-
 .../membership/gms/messenger/StatRecorder.java  | 30 +++++++--
 .../gms/membership/StatRecorderJUnitTest.java   | 67 ++++++++++++++------
 5 files changed, 93 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 4484c00..4f5a1a4 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -102,6 +102,19 @@ public class Services {
   }
   
 
+  /**
+   * for testing only - create a non-functional Services object with a Stopper
+   */
+  public Services() {
+    this.cancelCriterion = new Stopper();
+    this.stats = null;
+    this.config = null;
+    this.manager = null;
+    this.joinLeave = null;
+    this.healthMon = null;
+    this.messenger = null;
+    this.auth = null;
+  }
 
   public Services(
       DistributedMembershipListener listener, DistributionConfig config,
@@ -348,10 +361,10 @@ public class Services {
   public boolean isAutoReconnectEnabled() {
     return !getConfig().getDistributionConfig().getDisableAutoReconnect();
   }
-  
+
   public class Stopper extends CancelCriterion {
     volatile String reasonForStopping = null;
-    
+
     public void cancel(String reason) {
       this.reasonForStopping = reason;
     }
@@ -362,7 +375,7 @@ public class Services {
         return Services.this.shutdownCause.toString();
       return reasonForStopping;
     }
-    
+
     public boolean isCancelInProgress() {
       return cancelInProgress() != null;
     }
@@ -381,7 +394,7 @@ public class Services {
         }
       }
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 510c5a8..5427d77 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1190,7 +1190,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @param initiator
    * @param sMembers
    * @param cv
-   * @param initiateRemoval
    */
   private void checkIfAvailable(final InternalDistributedMember initiator,
       List<SuspectRequest> sMembers, final NetView cv) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 2dfeeaa..a9abea3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -293,7 +293,7 @@ public class JGroupsMessenger implements Messenger {
     // give the stats to the jchannel statistics recorder
     StatRecorder sr = (StatRecorder)myChannel.getProtocolStack().findProtocol(StatRecorder.class);
     if (sr != null) {
-      sr.setDMStats(services.getStatistics());
+      sr.setServices(services);
     }
     
     Transport transport = (Transport)myChannel.getProtocolStack().getTransport();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
index e29d71e..e013de6 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
@@ -32,6 +32,8 @@ import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 
+import java.util.concurrent.RejectedExecutionException;
+
 /**
  * JGroups doesn't capture quite the stats we want so this protocol is
  * inserted into the stack to gather the missing ones.
@@ -46,17 +48,19 @@ public class StatRecorder extends Protocol {
   private static final int INCOMING = 1;
   
   DMStats stats;
+  Services services;
   
   private final short nakackHeaderId = ClassConfigurator.getProtocolId(NAKACK2.class);
   private final short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
   private final short frag2HeaderId = ClassConfigurator.getProtocolId(FRAG2.class);
   
   /**
-   * set the statistics object to modify when events are detected
-   * @param stats
+   * sets the services object of the GMS that is using this recorder
+   * @param services the Services collective of the GMS
    */
-  public void setDMStats(DMStats stats) {
-    this.stats = stats;
+  public void setServices(Services services) {
+    this.services = services;
+    this.stats = services.getStatistics();
   }
   
   @Override
@@ -81,7 +85,23 @@ public class StatRecorder extends Protocol {
       filter(msg, OUTGOING);
       break;
     }
-    return down_prot.down(evt);
+    do {
+      try {
+        return down_prot.down(evt);
+      } catch (RejectedExecutionException e) {
+        logger.debug("retrying JGroups message transmission due to rejected execution (GEODE-1178)");
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ie) {
+          // down() does not throw InterruptedException so we can only set the interrupt flag and return
+          Thread.currentThread().interrupt();
+          return null;
+        }
+      }
+    } while (services != null
+      && !services.getManager().shutdownInProgress()
+      && !services.getCancelCriterion().isCancelInProgress());
+    return null;
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/39e94bc8/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
index 91d4a57..b7b80ac 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/StatRecorderJUnitTest.java
@@ -16,33 +16,34 @@
  */
 package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Properties;
-
-import org.jgroups.Event;
-import org.jgroups.Message;
-import org.jgroups.protocols.UNICAST3.Header;
-import org.jgroups.protocols.pbcast.NakAckHeader2;
-import org.jgroups.stack.Protocol;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.LonerDistributionManager.DummyDMStats;
 import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder;
 import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import org.jgroups.Event;
+import org.jgroups.Message;
+import org.jgroups.protocols.UNICAST3.Header;
+import org.jgroups.protocols.pbcast.NakAckHeader2;
+import org.jgroups.stack.Protocol;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
 
 /**
  * This class tests the GMS StatRecorder class, which records JGroups
@@ -53,14 +54,17 @@ public class StatRecorderJUnitTest {
   Protocol mockDownProtocol, mockUpProtocol;
   StatRecorder recorder;
   MyStats stats = new MyStats();
+  Services services;
   
   @Before
   public void initMocks() throws Exception {
     // create a StatRecorder that has mock up/down protocols and stats
     mockDownProtocol = mock(Protocol.class);
     mockUpProtocol = mock(Protocol.class);
+    services = mock(Services.class);
+    when(services.getStatistics()).thenReturn(stats);
     recorder = new StatRecorder();
-    recorder.setDMStats(stats);
+    recorder.setServices(services);
     recorder.setUpProtocol(mockUpProtocol);
     recorder.setDownProtocol(mockDownProtocol);
   }
@@ -94,6 +98,33 @@ public class StatRecorderJUnitTest {
         stats.ucastRetransmits == 1);
   }
 
+
+  @Test
+  public void recorderHandlesRejectedExecution() throws Exception {
+    Message msg = mock(Message.class);
+    when(msg.getHeader(any(Short.class))).thenReturn(Header.createDataHeader(1L, (short)1, true));
+    when(msg.size()).thenReturn(150L);
+
+
+    // GEODE-1178, the TP protocol may throw a RejectedExecutionException & StatRecorder should retry
+    when(mockDownProtocol.down(any(Event.class))).thenThrow(new RejectedExecutionException());
+
+    // after the first down() throws an exception we want StatRecorder to retry, so
+    // we set the Manager to say no shutdown is in progress the first time and then say
+    // one IS in progress so we can break out of the StatRecorder exception handling loop
+    when(services.getCancelCriterion()).thenReturn(new Services().getCancelCriterion());
+    Manager manager = mock(Manager.class);
+    when(services.getManager()).thenReturn(manager);
+    when(manager.shutdownInProgress()).thenReturn(Boolean.FALSE, Boolean.TRUE);
+
+    verify(mockDownProtocol, never()).down(isA(Event.class));
+
+    Event evt = new Event(Event.MSG, msg);
+    recorder.down(evt);
+
+    verify(mockDownProtocol, times(2)).down(isA(Event.class));
+  }
+
   /**
    * ensure that multicast events are recorded in DMStats
    */


[05/18] incubator-geode git commit: GEODE-1218: Marking jna as not optional

Posted by kl...@apache.org.
GEODE-1218: Marking jna as not optional

JNA is used for common features like offheap memory and disk stores, so
it should not be marked optional.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fbee35cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fbee35cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fbee35cc

Branch: refs/heads/feature/GEODE-1162
Commit: fbee35cc424f80d5f9ed316da99e0caeb5600ddc
Parents: 69cd4a7
Author: Dan Smith <up...@apache.org>
Authored: Tue Apr 12 13:43:14 2016 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Wed Apr 13 10:08:42 2016 -0700

----------------------------------------------------------------------
 geode-core/build.gradle | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fbee35cc/geode-core/build.gradle
----------------------------------------------------------------------
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 8216d2a..2206018 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -61,9 +61,7 @@ dependencies {
   compile ('mx4j:mx4j-tools:' + project.'mx4j.version') { 
     ext.optional = true;
   }
-  compile ('net.java.dev.jna:jna:' + project.'jna.version') {
-    ext.optional = true
-  }
+  compile ('net.java.dev.jna:jna:' + project.'jna.version')
   provided ('org.apache.hadoop:hadoop-common:' + project.'hadoop.version') {
     transitive=false
   }


[12/18] incubator-geode git commit: GEODE-1220: Minor refactoring of LuceneServiceImplJUnitTest and LuceneIndexRecoveryHAJUnitTest

Posted by kl...@apache.org.
GEODE-1220: Minor refactoring of LuceneServiceImplJUnitTest and LuceneIndexRecoveryHAJUnitTest

Removed sleep with awaitility until async queue size is 0
Removed unnecessary/dead code
Renamed tests to be slightly more descriptive


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7d5f39ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7d5f39ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7d5f39ae

Branch: refs/heads/feature/GEODE-1162
Commit: 7d5f39aeebeb345600e8608d77a2d42720871907
Parents: 6c7a0d2
Author: Jason Huynh <hu...@gmail.com>
Authored: Tue Apr 12 15:11:18 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Wed Apr 13 13:39:25 2016 -0700

----------------------------------------------------------------------
 .../LuceneIndexRecoveryHAJUnitTest.java         | 20 ++++++---
 .../internal/LuceneServiceImplJUnitTest.java    | 44 ++++++++++----------
 2 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7d5f39ae/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAJUnitTest.java
index 405c986..9ab6e81 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAJUnitTest.java
@@ -19,9 +19,10 @@
 
 package com.gemstone.gemfire.cache.lucene.internal;
 
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -41,6 +42,7 @@ import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.lucene.LuceneQuery;
 import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
 import com.gemstone.gemfire.cache.lucene.LuceneService;
@@ -54,6 +56,7 @@ import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.jayway.awaitility.Awaitility;
 
 @Category(IntegrationTest.class)
 public class LuceneIndexRecoveryHAJUnitTest {
@@ -120,6 +123,8 @@ public class LuceneIndexRecoveryHAJUnitTest {
 
   @Test
   public void recoverPersistentIndex() throws Exception {
+    String aeqId = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION);
+
     LuceneService service = LuceneServiceProvider.get(cache);
     service.createIndex(INDEX, REGION, Type1.fields);
 
@@ -133,8 +138,7 @@ public class LuceneIndexRecoveryHAJUnitTest {
     value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
     userRegion.put("value3", value);
 
-    // TODO flush queue
-    TimeUnit.MILLISECONDS.sleep(500);
+    waitUntilQueueEmpty(aeqId);
 
     LuceneQuery<Integer, Type1> query = service.createLuceneQueryFactory().create(INDEX, REGION, "s:world");
     LuceneQueryResults<Integer, Type1> results = query.search();
@@ -153,7 +157,6 @@ public class LuceneIndexRecoveryHAJUnitTest {
     results = query.search();
     Assert.assertEquals(3, results.size());
 
-    String aeqId = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION);
     PartitionedRegion chunkRegion = (PartitionedRegion) cache.getRegion(aeqId + ".chunks");
     assertNotNull(chunkRegion);
     chunkRegion.destroyRegion();
@@ -185,8 +188,7 @@ public class LuceneIndexRecoveryHAJUnitTest {
     value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
     userRegion.put("value3", value);
 
-    // TODO flush queue
-    TimeUnit.MILLISECONDS.sleep(500);
+    waitUntilQueueEmpty(aeqId);
 
     PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + ".files");
     assertNotNull(fileRegion);
@@ -198,4 +200,10 @@ public class LuceneIndexRecoveryHAJUnitTest {
     LuceneQueryResults<Integer, Type1> results = query.search();
     Assert.assertEquals(3, results.size());
   }
+
+  private void waitUntilQueueEmpty(final String aeqId) {
+    // TODO flush queue
+    AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId);
+    Awaitility.waitAtMost(1000, TimeUnit.MILLISECONDS).until(() -> assertEquals(0, queue.size()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7d5f39ae/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 3a6f38c..d0e9865 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -39,8 +39,10 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheFactory;
@@ -48,6 +50,7 @@ import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
 import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
 import com.gemstone.gemfire.cache.lucene.internal.distributed.LuceneFunction;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -67,23 +70,22 @@ public class LuceneServiceImplJUnitTest {
   private IndexWriter writer;
   LuceneServiceImpl service = null;
   private static final Logger logger = LogService.getLogger();
-  
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   // lucene service will register query execution function on initialization
   @Test
   public void shouldRegisterQueryFunction() {
     Function function = FunctionService.getFunction(LuceneFunction.ID);
     assertNull(function);
 
-    cache = createBasicCache();
+    cache = getCache();
     new LuceneServiceImpl().init(cache);
 
     function = FunctionService.getFunction(LuceneFunction.ID);
     assertNotNull(function);
   }
-  
-  private GemFireCacheImpl createBasicCache() {
-    return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create();
-  }
 
   @After
   public void destroyCache() {
@@ -93,34 +95,35 @@ public class LuceneServiceImplJUnitTest {
     }
   }
   
-  private void getCache() {
+  private Cache getCache() {
     try {
        cache = CacheFactory.getAnyInstance();
     } catch (Exception e) {
       //ignore
     }
     if (null == cache) {
-      cache = createBasicCache();
+      cache = new CacheFactory().set("mcast-port", "0").create();
     }
+    return cache;
   }
   
-  private void getService() {
+  private LuceneService getService() {
     if (cache == null) {
       getCache();
     }
     if (service == null) {
       service = (LuceneServiceImpl)LuceneServiceProvider.get(cache);
     }
+    return service;
   }
   
   private LocalRegion createPR(String regionName, boolean isSubRegion) {
     if (isSubRegion) {
-      LocalRegion root = (LocalRegion)cache.createRegionFactory(RegionShortcut.REPLICATE).create("root");
+      LocalRegion root = (LocalRegion)cache.createRegionFactory(RegionShortcut.PARTITION).create("root");
       LocalRegion region = (LocalRegion)cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT).
           createSubregion(root, regionName);
       return region;
     } else {
-      LocalRegion root = (LocalRegion)cache.createRegionFactory(RegionShortcut.REPLICATE).create("root");
       LocalRegion region = (LocalRegion)cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT).
           create(regionName);
       return region;
@@ -134,25 +137,21 @@ public class LuceneServiceImplJUnitTest {
           createSubregion(root, regionName);
       return region;
     } else {
-      LocalRegion root = (LocalRegion)cache.createRegionFactory(RegionShortcut.REPLICATE).create("root");
       LocalRegion region = (LocalRegion)cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).
           create(regionName);
       return region;
     }
   }
-  
-  /**Test that we don't allow the user
-   * to create the region first.
-   */
+
   @Test(expected = IllegalStateException.class)
-  public void createRegionFirst() throws IOException, ParseException {
+  public void cannotCreateLuceneIndexAfterRegionHasBeenCreated() throws IOException, ParseException {
     getService();
     LocalRegion userRegion = createPR("PR1", false);
     service.createIndex("index1", "PR1", "field1", "field2", "field3");
   }
 
   @Test
-  public void testCreateIndexForPR() throws IOException, ParseException {
+  public void canCreateLuceneIndexForPR() throws IOException, ParseException {
     getService();
     service.createIndex("index1", "PR1", "field1", "field2", "field3");
     LocalRegion userRegion = createPR("PR1", false);
@@ -184,7 +183,7 @@ public class LuceneServiceImplJUnitTest {
   }
 
   @Test
-  public void testCreateIndexForPRWithAnalyzer() throws IOException, ParseException {
+  public void canCreateLuceneIndexForPRWithAnalyzer() throws IOException, ParseException {
     getService();
     StandardAnalyzer sa = new StandardAnalyzer();
     KeywordAnalyzer ka = new KeywordAnalyzer();
@@ -217,12 +216,13 @@ public class LuceneServiceImplJUnitTest {
     assertTrue(chunkPR != null);
   }
   
-  @Test (expected=UnsupportedOperationException.class)
-  public void testCreateIndexForRR() throws IOException, ParseException {
+  @Test
+  public void cannotCreateLuceneIndexForReplicateRegion() throws IOException, ParseException {
+    expectedException.expect(UnsupportedOperationException.class);
+    expectedException.expectMessage("Lucene indexes on replicated regions are not supported");
     getService();
     service.createIndex("index1", "RR1", "field1", "field2", "field3");
     createRR("RR1", false);
-    fail("Expect UnsupportedOperationException");
   }
 
 }


[16/18] incubator-geode git commit: GEODE-1201: Reverting gradle changes

Posted by kl...@apache.org.
GEODE-1201: Reverting gradle changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5c89fab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5c89fab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5c89fab6

Branch: refs/heads/feature/GEODE-1162
Commit: 5c89fab63991da25a1262caf29a94bdc8eedbd6d
Parents: d1e4825
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Apr 13 11:42:21 2016 +1000
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Thu Apr 14 10:07:06 2016 +1000

----------------------------------------------------------------------
 geode-assembly/build.gradle | 6 ------
 1 file changed, 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5c89fab6/geode-assembly/build.gradle
----------------------------------------------------------------------
diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index 8f14fc1..b7d05e2 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -81,7 +81,6 @@ sourceSets {
 
 test {
   // test from the actual classpath not the gradle classpath
-  dependsOn copyRuntimeLibs
   dependsOn installDist
   // @TODO: this doesn't seem to be working need to get basename first.
   classpath += files "$buildDir/install/apache-geode/lib/geode-dependencies.jar"
@@ -113,11 +112,6 @@ task defaultCacheConfig(type: JavaExec, dependsOn: classes) {
   }
 }
 
-task copyRuntimeLibs(type: Copy) {
-  into "lib"
-  from configurations.testRuntime - configurations.runtime
-}
-
 // This closure sets the gemfire classpath.  If we add another jar to the classpath it must
 // be included in the filter logic below.
 def cp = {


[08/18] incubator-geode git commit: GEODE-1198 CI Failure: DistributedSystemDUnitTest.testConflictingUDPPort

Posted by kl...@apache.org.
GEODE-1198 CI Failure: DistributedSystemDUnitTest.testConflictingUDPPort

For this test, the membership-port-range has only 3 ports available. If in
some rare cases, one of the ports is used by others, it will get RMIException.
So just ignore it. Since adding one more port will also fail the test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9d20f22f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9d20f22f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9d20f22f

Branch: refs/heads/feature/GEODE-1162
Commit: 9d20f22ff99fc76bf2da3218f02ecb95cbf39b32
Parents: 80533ba
Author: Jianxia Chen <jc...@pivotal.io>
Authored: Wed Apr 13 10:15:08 2016 -0700
Committer: Jianxia Chen <jc...@pivotal.io>
Committed: Wed Apr 13 10:19:08 2016 -0700

----------------------------------------------------------------------
 .../distributed/DistributedSystemDUnitTest.java | 20 +++++++++++++++++---
 1 file changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9d20f22f/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedSystemDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedSystemDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedSystemDUnitTest.java
index 0109845..22bb176 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedSystemDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedSystemDUnitTest.java
@@ -16,7 +16,10 @@
  */
 package com.gemstone.gemfire.distributed;
 
-import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.net.Inet4Address;
 import java.net.Inet6Address;
@@ -26,8 +29,12 @@ import java.util.Enumeration;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.SystemConnectException;
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheFactory;
@@ -50,11 +57,10 @@ import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.RMIException;
 import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 /**
  * Tests the functionality of the {@link DistributedSystem} class.
@@ -387,6 +393,14 @@ public class DistributedSystemDUnitTest extends JUnit4DistributedTestCase {
           } catch (GemFireConfigException e) {
             return; // 
           }
+          catch (RMIException e) {
+            if (e.getCause() instanceof SystemConnectException) {
+              //GEODE-1198: for this test, the membership-port-range has only 3 ports available.
+              //If in some rare cases, one of the ports is used by others, it will get this 
+              //exception. So just ignore it. Since adding one more port will also fail the test.
+              return;
+            }
+          }
           fail("expected a GemFireConfigException but didn't get one");
         }
       });


[03/18] incubator-geode git commit: GEODE-1214: Better error handling for compilation failures

Posted by kl...@apache.org.
GEODE-1214: Better error handling for compilation failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/90ab09cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/90ab09cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/90ab09cc

Branch: refs/heads/feature/GEODE-1162
Commit: 90ab09cc429643e3b7d9ec336fccbc6b2c0402c4
Parents: c67a1c9
Author: Jens Deppe <jd...@pivotal.io>
Authored: Tue Apr 12 09:01:00 2016 -0700
Committer: Jens Deppe <jd...@pivotal.io>
Committed: Tue Apr 12 12:44:41 2016 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/ClassBuilder.java    | 13 ++++++++++++-
 .../gemfire/internal/JarClassLoaderJUnitTest.java  | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/90ab09cc/geode-core/src/test/java/com/gemstone/gemfire/internal/ClassBuilder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/ClassBuilder.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/ClassBuilder.java
index 118ad86..780ed4f 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/ClassBuilder.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/ClassBuilder.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
+import javax.tools.Diagnostic;
+import javax.tools.DiagnosticCollector;
 import javax.tools.FileObject;
 import javax.tools.ForwardingJavaFileManager;
 import javax.tools.JavaCompiler;
@@ -214,7 +216,16 @@ public class ClassBuilder implements Serializable {
     fileObjects.add(new JavaSourceFromString(className, classCode));
 
     List<String> options = Arrays.asList("-classpath", this.classPath);
-    javaCompiler.getTask(null, fileManager, null, options, null, fileObjects).call();
+    DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
+    if (! javaCompiler.getTask(null, fileManager, diagnostics, options, null, fileObjects).call()) {
+      StringBuilder errorMsg = new StringBuilder();
+      for (Diagnostic d : diagnostics.getDiagnostics()) {
+        String err = String.format("Compilation error: Line %d - %s%n", d.getLineNumber(), d.getMessage(null));
+        errorMsg.append(err);
+        System.err.print(err);
+      }
+      throw new IOException(errorMsg.toString());
+    }
     return byteArrayOutputStream.toByteArray();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/90ab09cc/geode-core/src/test/java/com/gemstone/gemfire/internal/JarClassLoaderJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/JarClassLoaderJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/JarClassLoaderJUnitTest.java
index 20c7990..56b715f 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/JarClassLoaderJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/JarClassLoaderJUnitTest.java
@@ -171,6 +171,23 @@ public class JarClassLoaderJUnitTest {
   }
 
   @Test
+  public void testFailingCompilation() throws Exception {
+    StringBuffer stringBuffer = new StringBuffer();
+    stringBuffer.append("import com.gemstone.gemfire.cache.Declarable;");
+    stringBuffer.append("import com.gemstone.gemfire.cache.execute.Function;");
+    stringBuffer.append("import com.gemstone.gemfire.cache.execute.FunctionContext;");
+    stringBuffer.append("public class JarClassLoaderJUnitFunction implements Function {}");
+    String functionString = stringBuffer.toString();
+
+    try {
+      this.classBuilder.createJarFromClassContent("JarClassLoaderJUnitFunction", functionString);
+      fail("This code should have failed to compile and thrown an exception");
+    } catch (Exception ex) {
+      // All good
+    }
+  }
+
+  @Test
   public void testFunctions() throws IOException, ClassNotFoundException {
     final File jarFile1 = new File(JAR_PREFIX + "JarClassLoaderJUnit.jar#1");
     final File jarFile2 = new File(JAR_PREFIX + "JarClassLoaderJUnit.jar#2");


[18/18] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-1162

Posted by kl...@apache.org.
Merge remote-tracking branch 'origin/develop' into feature/GEODE-1162


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e2813de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e2813de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e2813de4

Branch: refs/heads/feature/GEODE-1162
Commit: e2813de4037f23647ee315975d0ab42e7865f9fb
Parents: d9344e0 7b3c8cb
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Apr 14 10:03:50 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Apr 14 10:03:50 2016 -0700

----------------------------------------------------------------------
 geode-core/build.gradle                         |   8 +-
 .../client/internal/CloseConnectionOp.java      |   0
 .../cache/client/internal/ConnectionImpl.java   |   8 +-
 .../internal/membership/gms/Services.java       |  23 +-
 .../membership/gms/fd/GMSHealthMonitor.java     |  20 +-
 .../gms/messenger/JGroupsMessenger.java         |   2 +-
 .../membership/gms/messenger/StatRecorder.java  |  30 +-
 .../gemfire/internal/SocketCreator.java         |  25 +-
 .../gemfire/internal/SocketIOWithTimeout.java   | 491 -------------------
 .../gemfire/internal/SocketInputStream.java     | 181 -------
 .../gemfire/internal/SocketInputWrapper.java    |  93 ----
 .../gemfire/internal/SocketOutputStream.java    | 174 -------
 .../gemstone/gemfire/internal/SocketUtils.java  | 220 ---------
 .../gemfire/internal/cache/EntryEventImpl.java  |   4 +-
 .../internal/cache/GemFireCacheImpl.java        |  14 +-
 .../cache/SearchLoadAndWriteProcessor.java      |  28 +-
 .../cache/execute/util/CommitFunction.java      | 141 ------
 .../execute/util/NestedTransactionFunction.java | 116 -----
 .../cache/execute/util/RollbackFunction.java    | 136 -----
 .../cache/tier/sockets/AcceptorImpl.java        |   3 +-
 .../cache/tier/sockets/CacheClientNotifier.java | 105 +---
 .../cache/tier/sockets/CacheClientUpdater.java  |   5 +-
 .../internal/cache/tier/sockets/HandShake.java  |  13 +-
 .../internal/cache/tier/sockets/Message.java    |   3 +-
 .../cache/tier/sockets/ServerConnection.java    |   5 +-
 .../tier/sockets/ServerHandShakeProcessor.java  |  26 +-
 .../tier/sockets/command/CloseConnection.java   |   0
 .../serial/SerialSecondaryGatewayListener.java  |  19 +-
 .../gemfire/internal/shared/NativeCalls.java    |   2 +-
 .../gemfire/internal/tcp/Connection.java        |   5 +-
 .../gemfire/management/internal/RestAgent.java  |  51 +-
 .../distributed/DistributedSystemDUnitTest.java |  20 +-
 .../gms/membership/StatRecorderJUnitTest.java   |  67 ++-
 .../gemstone/gemfire/internal/ClassBuilder.java |  13 +-
 .../internal/JarClassLoaderJUnitTest.java       |  17 +
 .../cache/ClientServerTransactionDUnitTest.java |   2 -
 .../gemfire/internal/cache/CommitFunction.java  | 142 ++++++
 .../internal/cache/EntryEventImplTest.java      |  71 +++
 .../cache/NestedTransactionFunction.java        | 116 +++++
 .../internal/cache/RollbackFunction.java        | 137 ++++++
 .../cache/SearchLoadAndWriteProcessorTest.java  |  68 +++
 .../internal/directory/RegionDirectory.java     |  20 +-
 .../lucene/internal/filesystem/FileSystem.java  |   5 +
 .../repository/IndexRepositoryImpl.java         |   2 +-
 .../repository/serializer/SerializerUtil.java   |  16 +-
 .../LuceneIndexRecoveryHAJUnitTest.java         |  20 +-
 .../internal/LuceneServiceImplJUnitTest.java    |  44 +-
 .../IndexRepositoryImplPerformanceTest.java     |   4 +-
 geode-site/website/Rules                        |   6 +
 geode-site/website/layouts/default.html         |   2 +-
 geode-site/website/layouts/header.html          |   2 +-
 geode-site/website/layouts/releases.html        |   1 +
 .../web/swagger/config/RestApiPathProvider.java |  27 +-
 gradle/dependency-versions.properties           |   2 +-
 54 files changed, 916 insertions(+), 1839 deletions(-)
----------------------------------------------------------------------



[14/18] incubator-geode git commit: GEODE-1018: Retrieve queues only once gateway listener to avoid a race

Posted by kl...@apache.org.
GEODE-1018: Retrieve queues only once gateway listener to avoid a race

The second call to getQueues could have returned null due to a
concurrent change. By fetching the queues only once we avoid a race.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a885ac14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a885ac14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a885ac14

Branch: refs/heads/feature/GEODE-1162
Commit: a885ac140b50d4f3ec74ede9fcdff98623d8dc81
Parents: bbf705e
Author: Dan Smith <up...@apache.org>
Authored: Wed Apr 13 10:12:29 2016 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Wed Apr 13 16:54:58 2016 -0700

----------------------------------------------------------------------
 .../serial/SerialSecondaryGatewayListener.java   | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a885ac14/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialSecondaryGatewayListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialSecondaryGatewayListener.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialSecondaryGatewayListener.java
index 5cb0ec0..8250270 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialSecondaryGatewayListener.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialSecondaryGatewayListener.java
@@ -16,10 +16,13 @@
  */
 package com.gemstone.gemfire.internal.cache.wan.serial;
 
+import java.util.Set;
+
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -55,11 +58,8 @@ public class SerialSecondaryGatewayListener extends CacheListenerAdapter
    }
    // There is a small window where queue has not been created fully yet. 
    // The underlying region of the queue is created, and it receives afterDestroy callback
-   if (this.sender.getQueues() != null && !this.sender.getQueues().isEmpty()) {
-//     int size = 0;
-//     for(RegionQueue q: this.sender.getQueues()) {
-//       size += q.size();
-//     }
+   final Set<RegionQueue> queues = this.sender.getQueues();
+   if (queues != null && !queues.isEmpty()) {
      this.sender.getStatistics().incQueueSize();
    }
    // fix bug 35730
@@ -76,12 +76,9 @@ public class SerialSecondaryGatewayListener extends CacheListenerAdapter
    }
     // fix bug 37603
     // There is a small window where queue has not been created fully yet. The region is created, and it receives afterDestroy callback.
-   
-   if (this.sender.getQueues() != null && !this.sender.getQueues().isEmpty()) {
-//     int size = 0;
-//     for(RegionQueue q: this.sender.getQueues()) {
-//       size += q.size();
-//     }
+
+   final Set<RegionQueue> queues = this.sender.getQueues();
+   if (queues != null && !queues.isEmpty()) {
      this.sender.getStatistics().decQueueSize();
    }
 


[02/18] incubator-geode git commit: GEODE-235: Add region name to EntryEventImpl toString

Posted by kl...@apache.org.
GEODE-235: Add region name to EntryEventImpl toString

A new EntryEventImplTest was added. It currently only
verifies that the toString output includes the region name.

This closes #125


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c67a1c91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c67a1c91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c67a1c91

Branch: refs/heads/feature/GEODE-1162
Commit: c67a1c913d72062044767717078d8c5bcc5200a9
Parents: 39e94bc
Author: Scott Jewell <sj...@pivotal.io>
Authored: Tue Apr 5 11:03:06 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Apr 12 10:50:49 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/EntryEventImpl.java  |  4 +-
 .../internal/cache/EntryEventImplTest.java      | 71 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c67a1c91/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index ba9ac11..3c87654 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2306,6 +2306,8 @@ public class EntryEventImpl
 
     buf.append("op=");
     buf.append(getOperation());
+    buf.append(";region=");
+    buf.append(getRegion().getFullPath());
     buf.append(";key=");
     buf.append(this.getKey());
     buf.append(";oldValue=");
@@ -3121,4 +3123,4 @@ public class EntryEventImpl
   public final void setLoadedFromHDFS(boolean loadedFromHDFS) {
     this.loadedFromHDFS = loadedFromHDFS;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c67a1c91/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
new file mode 100644
index 0000000..c3e057c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import static org.mockito.Mockito.*;
+
+@Category(UnitTest.class)
+public class EntryEventImplTest {
+
+  String expectedRegionName = "ExpectedFullRegionPathName";
+  String key = "key1";
+  String value = "value1";
+  KeyInfo keyInfo = new KeyInfo(key, value, null);
+
+  @Test
+  public void verifyToStringOutputHasRegionName() {
+    // mock a region object
+    LocalRegion region = mock(LocalRegion.class);
+    doReturn(expectedRegionName).when(region).getFullPath();
+    doReturn(keyInfo).when(region).getKeyInfo(any(), any(), any());
+
+    // create entryevent for the region
+    EntryEventImpl e = createEntryEvent(region);
+    
+    // The name of the region should be in the toString text
+    String toStringValue = e.toString();
+    assertTrue("String " + expectedRegionName + " was not in toString text: " + toStringValue, toStringValue.indexOf(expectedRegionName) > 0);
+
+    // verify that toString called getFullPath method of region object
+    verify(region, Mockito.times(1)).getFullPath();
+  }
+
+  private EntryEventImpl createEntryEvent(LocalRegion l) {
+    // create a dummy event id
+    byte[] memId = { 1,2,3 };
+    EventID eventId = new EventID(memId, 11, 12, 13);
+
+    // create an event
+    EntryEventImpl event = EntryEventImpl.create(l, Operation.CREATE, key,
+        value, null,  false /* origin remote */, null,
+        false /* generateCallbacks */,
+        eventId);
+    // avoid calling invokeCallbacks
+    event.callbacksInvoked(true);
+
+    return event;
+  }
+}
\ No newline at end of file


[17/18] incubator-geode git commit: GEODE-679 Explore removing SocketIOWithTimeout and other classes related to FD soft leak

Posted by kl...@apache.org.
GEODE-679 Explore removing SocketIOWithTimeout and other classes related to FD soft leak

SocketUtils was added to avoid a file descriptor "leak" caused by the use of NIO socket
channel selectors.  This was spurred by a HADOOP JIRA ticket that claimed that
sun.misc.Cleaners were being used to close selectors (see
https://issues.apache.org/jira/browse/HADOOP-4346).  I have verified that Cleaners are
no longer used to close selectors and that SocketUtils is not making any difference in
the number of file descriptors created in servers using NIO selectors using the (recently
deleted) FDDUnitTest with modifications to force clients to close their connections to
the server.

So, this change-set removes SocketUtils and associated classes, reverting all modifications
made to introduce it in GemFire 8.0 to use direct method invokations on sockets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7b3c8cb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7b3c8cb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7b3c8cb4

Branch: refs/heads/feature/GEODE-1162
Commit: 7b3c8cb42abc19e62829aa8662a37415afc50b75
Parents: 5c89fab
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Apr 14 09:31:13 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Apr 14 09:34:07 2016 -0700

----------------------------------------------------------------------
 .../client/internal/CloseConnectionOp.java      |   0
 .../cache/client/internal/ConnectionImpl.java   |   8 +-
 .../gemfire/internal/SocketCreator.java         |  17 +-
 .../gemfire/internal/SocketIOWithTimeout.java   | 491 -------------------
 .../gemfire/internal/SocketInputStream.java     | 181 -------
 .../gemfire/internal/SocketInputWrapper.java    |  93 ----
 .../gemfire/internal/SocketOutputStream.java    | 174 -------
 .../gemstone/gemfire/internal/SocketUtils.java  | 220 ---------
 .../internal/cache/GemFireCacheImpl.java        |  14 +-
 .../cache/tier/sockets/AcceptorImpl.java        |   3 +-
 .../cache/tier/sockets/CacheClientNotifier.java | 105 +---
 .../cache/tier/sockets/CacheClientUpdater.java  |   5 +-
 .../internal/cache/tier/sockets/HandShake.java  |  13 +-
 .../internal/cache/tier/sockets/Message.java    |   3 +-
 .../cache/tier/sockets/ServerConnection.java    |   5 +-
 .../tier/sockets/ServerHandShakeProcessor.java  |  26 +-
 .../tier/sockets/command/CloseConnection.java   |   0
 .../gemfire/internal/shared/NativeCalls.java    |   2 +-
 .../gemfire/internal/tcp/Connection.java        |   5 +-
 19 files changed, 52 insertions(+), 1313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
old mode 100644
new mode 100755
index 5016d67..c20b318
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
@@ -35,14 +35,10 @@ import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunct
 import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
 import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.tier.Acceptor;
 import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
@@ -109,8 +105,8 @@ public class ConnectionImpl implements Connection {
     verifySocketBufferSize(socketBufferSize, theSocket.getSendBufferSize(), "send");
     
     theSocket.setSoTimeout(handShakeTimeout);
-    out = SocketUtils.getOutputStream(theSocket);//theSocket.getOutputStream();
-    in = SocketUtils.getInputStream(theSocket);//theSocket.getInputStream();
+    out = theSocket.getOutputStream();
+    in = theSocket.getInputStream();
     this.status = handShake.greet(this, location, communicationMode);
     commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
     if (sender != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index acdfbc7..9b7e5d7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -19,8 +19,6 @@ package com.gemstone.gemfire.internal;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.Inet4Address;
 import java.net.Inet6Address;
@@ -995,13 +993,7 @@ public class SocketCreator {
           if (optionalWatcher != null) {
             optionalWatcher.beforeConnect(socket);
           }
-          if (timeout > 0) {
-            SocketUtils.connect(socket, sockaddr, timeout);
-          }
-          else {
-            SocketUtils.connect(socket, sockaddr, 0);
-
-          }
+          socket.connect(sockaddr, Math.max(timeout,0));
           configureClientSSLSocket( socket );
           return socket;
         } 
@@ -1024,12 +1016,7 @@ public class SocketCreator {
             if (optionalWatcher != null) {
               optionalWatcher.beforeConnect(socket);
             }
-          if (timeout > 0) {
-            SocketUtils.connect(socket, sockaddr, timeout);
-            }
-            else {
-              SocketUtils.connect(socket, sockaddr, 0);
-            }
+            socket.connect(sockaddr, Math.max(timeout,0));
           }
           return socket;
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
deleted file mode 100644
index 9fbead8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Pivotal Additions:
- * Using a ConcurrentHashMap with a LinkedBlockingDeque instead
- * Added a cleanup thread
- * Modifications to trimIdleSelector
- * 
- */
-
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * This supports input and output streams for a socket channels. 
- * These streams can have a timeout.
- */
-public abstract class SocketIOWithTimeout {
-  
-  private static final Logger logger = LogService.getLogger();
-  
-  private SelectableChannel channel;
-  private long timeout;
-  private boolean closed = false;
-  
-  /*Pivotal Change to final*/
-  private static final SelectorPool selector = new SelectorPool();
-  
-  /*Pivotal Addition*/
-  //in seconds, the cleanup thread will first mark at the SELECTOR_TIME_OUT interval and
-  //close any selectors that have been marked on the next pass through
-  //This means that it will take approximately twice as long as SELECTOR_TIME_OUT to actually close
-  //an unused selector
-  private static final long SELECTOR_TIME_OUT = Long.getLong("gemfire.SELECTOR_TIME_OUT", 120L); 
-  
-  private static ScheduledExecutorService cleanUpExecutor = startSelectorCleanUpThread();
-
-  /*End Pivotal Addition*/
-  
-  /* A timeout value of 0 implies wait for ever. 
-   * We should have a value of timeout that implies zero wait.. i.e. 
-   * read or write returns immediately.
-   * 
-   * This will set channel to non-blocking.
-   */
-  SocketIOWithTimeout(SelectableChannel channel, long timeout) 
-                                                 throws IOException {
-    checkChannelValidity(channel);
-    
-    this.channel = channel;
-    this.timeout = timeout;
-    // Set non-blocking
-    channel.configureBlocking(false);
-  }
-  
-  void close() {
-    closed = true;
-  }
-
-  boolean isOpen() {
-    return !closed && channel.isOpen();
-  }
-
-  SelectableChannel getChannel() {
-    return channel;
-  }
-  
-  /** 
-   * Utility function to check if channel is ok.
-   * Mainly to throw IOException instead of runtime exception
-   * in case of mismatch. This mismatch can occur for many runtime
-   * reasons.
-   */
-  static void checkChannelValidity(Object channel) throws IOException {
-    if (channel == null) {
-      /* Most common reason is that original socket does not have a channel.
-       * So making this an IOException rather than a RuntimeException.
-       */
-      throw new IOException("Channel is null. Check " +
-                            "how the channel or socket is created.");
-    }
-    
-    if (!(channel instanceof SelectableChannel)) {
-      throw new IOException("Channel should be a SelectableChannel");
-    }    
-  }
-  
-  /**
-   * Performs actual IO operations. This is not expected to block.
-   *  
-   * @param buf
-   * @return number of bytes (or some equivalent). 0 implies underlying
-   *         channel is drained completely. We will wait if more IO is 
-   *         required.
-   * @throws IOException
-   */
-  abstract int performIO(ByteBuffer buf) throws IOException;  
-  
-  /**
-   * Performs one IO and returns number of bytes read or written.
-   * It waits up to the specified timeout. If the channel is 
-   * not read before the timeout, SocketTimeoutException is thrown.
-   * 
-   * @param buf buffer for IO
-   * @param ops Selection Ops used for waiting. Suggested values: 
-   *        SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
-   *        writing. 
-   *        
-   * @return number of bytes read or written. negative implies end of stream.
-   * @throws IOException
-   */
-  int doIO(ByteBuffer buf, int ops) throws IOException {
-    
-    /* For now only one thread is allowed. If user want to read or write
-     * from multiple threads, multiple streams could be created. In that
-     * case multiple threads work as well as underlying channel supports it.
-     */
-    if (!buf.hasRemaining()) {
-      throw new IllegalArgumentException("Buffer has no data left.");
-      //or should we just return 0?
-    }
-
-    while (buf.hasRemaining()) {
-      if (closed) {
-        return -1;
-      }
-
-      try {
-        int n = performIO(buf);
-        if (n != 0) {
-          // successful io or an error.
-          return n;
-        }
-      } catch (IOException e) {
-        if (!channel.isOpen()) {
-          closed = true;
-        }
-        throw e;
-      }
-
-      //now wait for socket to be ready.
-      int count = 0;
-      try {
-        count = selector.select(channel, ops, timeout);  
-      } catch (IOException e) { //unexpected IOException.
-        closed = true;
-        throw e;
-      } 
-
-      if (count == 0) {
-        throw new SocketTimeoutException(timeoutExceptionString(channel,
-                                                                timeout, ops));
-      }
-      // otherwise the socket should be ready for io.
-    }
-    
-    return 0; // does not reach here.
-  }
-  
-  /**
-   * The contract is similar to {@link SocketChannel#connect(SocketAddress)} 
-   * with a timeout.
-   * 
-   * @see SocketChannel#connect(SocketAddress)
-   * 
-   * @param channel - this should be a {@link SelectableChannel}
-   * @param endpoint
-   * @throws IOException
-   */
-  static void connect(SocketChannel channel, 
-                      SocketAddress endpoint, int timeout) throws IOException {
-    
-    boolean blockingOn = channel.isBlocking();
-    if (blockingOn) {
-      channel.configureBlocking(false);
-    }
-    
-    try { 
-      if (channel.connect(endpoint)) {
-        return;
-      }
-
-      long timeoutLeft = timeout;
-      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
-      
-      while (true) {
-        // we might have to call finishConnect() more than once
-        // for some channels (with user level protocols)
-        
-        int ret = selector.select((SelectableChannel)channel, 
-                                  SelectionKey.OP_CONNECT, timeoutLeft);
-        
-        if (ret > 0 && channel.finishConnect()) {
-          return;
-        }
-        
-        if (ret == 0 ||
-            (timeout > 0 &&  
-              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
-          throw new SocketTimeoutException(
-                    timeoutExceptionString(channel, timeout, 
-                                           SelectionKey.OP_CONNECT));
-        }
-      }
-    } catch (IOException e) {
-      // javadoc for SocketChannel.connect() says channel should be closed.
-      try {
-        channel.close();
-      } catch (IOException ignored) {}
-      throw e;
-    } finally {
-      if (blockingOn && channel.isOpen()) {
-        channel.configureBlocking(true);
-      }
-    }
-  }
-  
-  /**
-   * This is similar to doIO(ByteBuffer, int)} except that it
-   * does not perform any I/O. It just waits for the channel to be ready
-   * for I/O as specified in ops.
-   * 
-   * @param ops Selection Ops used for waiting
-   * 
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  void waitForIO(int ops) throws IOException {
-    
-    if (selector.select(channel, ops, timeout) == 0) {
-      throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
-                                                              ops)); 
-    }
-  }
-
-  public void setTimeout(long timeoutMs) {
-    this.timeout = timeoutMs;
-  }
-    
-  private static String timeoutExceptionString(SelectableChannel channel,
-                                               long timeout, int ops) {
-    
-    String waitingFor;
-    switch(ops) {
-    
-    case SelectionKey.OP_READ :
-      waitingFor = "read"; break;
-      
-    case SelectionKey.OP_WRITE :
-      waitingFor = "write"; break;      
-      
-    case SelectionKey.OP_CONNECT :
-      waitingFor = "connect"; break;
-      
-    default :
-      waitingFor = "" + ops;  
-    }
-    
-    return timeout + " millis timeout while " +
-           "waiting for channel to be ready for " + 
-           waitingFor + ". ch : " + channel;    
-  }
-  
-  public static ScheduledExecutorService startSelectorCleanUpThread() {
-    ScheduledExecutorService cleanUpExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-      public Thread newThread(final Runnable r) {
-        Thread result = new Thread(r, "selector-pool-cleanup");
-        result.setDaemon(true);
-        return result;
-      }
-    });
-    cleanUpExecutor.scheduleAtFixedRate(new Runnable(){
-        public void run() {
-          selector.trimIdleSelectors();
-        }
-      }, SELECTOR_TIME_OUT, SELECTOR_TIME_OUT, TimeUnit.SECONDS);
-    return cleanUpExecutor;
-  }
-  
-  public static void stopSelectorCleanUpThread() {
-    if (cleanUpExecutor != null) {
-      cleanUpExecutor.shutdownNow();
-    }
-  }
-  /**
-   * This maintains a pool of selectors. These selectors are closed
-   * once they are idle (unused) for a few seconds.
-   */
-  private static class SelectorPool {
-    
-    private static class SelectorInfo {
-      Selector              selector;
-      /**Pivotal Addition**/
-      LinkedBlockingDeque<SelectorInfo> deque;
-      volatile boolean markForClean = false;
-      /**End Pivotal Addition**/
- 
-      void close() {
-        if (selector != null) {
-          try {
-            selector.close();
-          } catch (IOException e) {
-            logger.warn("Unexpected exception while closing selector : ", e);
-          }
-        }
-      }    
-    }
-    
-    private final ConcurrentHashMap<SelectorProvider, LinkedBlockingDeque<SelectorInfo>> providerList = new ConcurrentHashMap<SelectorProvider, LinkedBlockingDeque<SelectorInfo>>();
-    
-    /**
-     * Waits on the channel with the given timeout using one of the 
-     * cached selectors. It also removes any cached selectors that are
-     * idle for a few seconds.
-     * 
-     * @param channel
-     * @param ops
-     * @param timeout
-     * @throws IOException
-     */
-    int select(SelectableChannel channel, int ops, long timeout) 
-                                                   throws IOException {
-     
-      SelectorInfo info = get(channel);
-      
-      SelectionKey key = null;
-      int ret = 0;
-      
-      try {
-        while (true) {
-          long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
-          key = channel.register(info.selector, ops);
-          ret = info.selector.select(timeout);
-          
-          if (ret != 0) {
-            return ret;
-          }
-          
-          /* Sometimes select() returns 0 much before timeout for 
-           * unknown reasons. So select again if required.
-           */
-          if (timeout > 0) {
-            timeout -= System.currentTimeMillis() - start;
-            if (timeout <= 0) {
-              return 0;
-            }
-          }
-          
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedIOException("Interruped while waiting for " +
-                                             "IO on channel " + channel +
-                                             ". " + timeout + 
-                                             " millis timeout left.");
-          }
-        }
-      } finally {
-        if (key != null) {
-          key.cancel();
-        }
-        
-        //clear the canceled key.
-        try {
-          info.selector.selectNow();
-        } catch (IOException e) {
-          logger.info("Unexpected Exception while clearing selector : ", e); // TODO:WTF: why is this info level??
-          // don't put the selector back.
-          info.close();
-          return ret; 
-        }
-        
-        release(info);
-      }
-    }
-
-    /**
-     * Takes one selector from end of LRU list of free selectors.
-     * If there are no selectors awailable, it creates a new selector.
-     * 
-     * @param channel 
-     * @throws IOException
-     */
-    private SelectorInfo get(SelectableChannel channel) 
-                                                         throws IOException {
-      SelectorProvider provider = channel.provider();
-      
-      /**Pivotal Change**/
-      LinkedBlockingDeque<SelectorInfo> deque = providerList.get(provider);
-      if (deque == null) {
-        deque = new LinkedBlockingDeque<SelectorInfo>();
-        LinkedBlockingDeque<SelectorInfo> presentValue = providerList.putIfAbsent(provider, deque); 
-        if (presentValue != null && deque != presentValue) {
-          deque = presentValue;
-        }
-      } 
-      /**poll instead of check empty**/       
-      
-      SelectorInfo selInfo = deque.pollFirst(); 
-      if (selInfo != null) {
-        selInfo.markForClean = false;
-      } else {
-        Selector selector = provider.openSelector();
-        selInfo = new SelectorInfo();
-        selInfo.selector = selector;
-        selInfo.deque = deque;
-      }
-      
-      /**end Pivotal Change**/
-      return selInfo;
-    }
-    
-    /**
-     * puts selector back at the end of LRU list of free selectos.
-     * 
-     * @param info
-     */
-    private void release(SelectorInfo info) {
-      /**Pivotal Addition **/
-      info.deque.addFirst(info);
-      /**End Pivotal Addition **/
-    }
-    
-    private void trimIdleSelectors() {
-      Iterator<LinkedBlockingDeque<SocketIOWithTimeout.SelectorPool.SelectorInfo>> poolIterator = providerList.values().iterator();
-      while (poolIterator.hasNext()) {
-        LinkedBlockingDeque<SelectorInfo> selectorPool = poolIterator.next();
-        trimSelectorPool(selectorPool);
-      }
-    }
-        
-    private void trimSelectorPool(LinkedBlockingDeque<SelectorInfo> selectorPool) {
-      SelectorInfo selectorInfo = selectorPool.peekLast();
-      //iterate backwards and remove any selectors that have been marked
-      //once we hit a selector that has yet to be marked, we can then mark the remaining
-      while (selectorInfo != null && selectorInfo.markForClean) {
-        selectorInfo = selectorPool.pollLast();
-        //check the flag again just to be sure
-        if (selectorInfo.markForClean ) {
-          selectorInfo.close();
-        }
-        else {
-          selectorPool.addFirst(selectorInfo);
-        }
-        selectorInfo = selectorPool.peekLast();
-      }
-      
-      //Mark all the selectors
-      Iterator<SelectorInfo> selectorIterator = selectorPool.iterator();
-      while (selectorIterator.hasNext()) {
-        selectorIterator.next().markForClean = true;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
deleted file mode 100644
index 430294a..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Pivotal Additions:
- * Removed the usage of import org.apache.hadoop.classification.InterfaceAudience
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * This implements an input stream that can have a timeout while reading.
- * This sets non-blocking flag on the socket channel.
- * So after create this object, read() on 
- * {@link Socket#getInputStream()} and write() on 
- * {@link Socket#getOutputStream()} for the associated socket will throw 
- * IllegalBlockingModeException. 
- * Please use {@link SocketOutputStream} for writing.
- */
-//@InterfaceAudience.LimitedPrivate("HDFS")
-public class SocketInputStream extends InputStream
-                               implements ReadableByteChannel {
-
-  private Reader reader;
-
-  private static class Reader extends SocketIOWithTimeout {
-    ReadableByteChannel channel;
-    
-    Reader(ReadableByteChannel channel, long timeout) throws IOException {
-      super((SelectableChannel)channel, timeout);
-      this.channel = channel;
-    }
-    
-    @Override
-    int performIO(ByteBuffer buf) throws IOException {
-      return channel.read(buf);
-    }
-  }
-  
-  /**
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @param channel 
-   *        Channel for reading, should also be a {@link SelectableChannel}.
-   *        The channel will be configured to be non-blocking.
-   * @param timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketInputStream(ReadableByteChannel channel, long timeout)
-                                                        throws IOException {
-    SocketIOWithTimeout.checkChannelValidity(channel);
-    reader = new Reader(channel, timeout);
-  }
-
-  /**
-   * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
-   * 
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @param timeout timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketInputStream(Socket socket, long timeout) 
-                                         throws IOException {
-    this(socket.getChannel(), timeout);
-  }
-  
-  /**
-   * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
-   * :<br><br>
-   * 
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @throws IOException
-   */
-  public SocketInputStream(Socket socket) throws IOException {
-    this(socket.getChannel(), socket.getSoTimeout());
-  }
-  
-  @Override
-  public int read() throws IOException {
-    /* Allocation can be removed if required.
-     * probably no need to optimize or encourage single byte read.
-     */
-    byte[] buf = new byte[1];
-    int ret = read(buf, 0, 1);
-    if (ret > 0) {
-      return (int)(buf[0] & 0xff);
-    }
-    if (ret != -1) {
-      // unexpected
-      throw new IOException("Could not read from stream");
-    }
-    return ret;
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return read(ByteBuffer.wrap(b, off, len));
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    /* close the channel since Socket.getInputStream().close()
-     * closes the socket.
-     */
-    reader.channel.close();
-    reader.close();
-  }
-
-  /**
-   * Returns underlying channel used by inputstream.
-   * This is useful in certain cases like channel for 
-   * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
-   */
-  public ReadableByteChannel getChannel() {
-    return reader.channel; 
-  }
-  
-  //ReadableByteChannel interface
-    
-  @Override
-  public boolean isOpen() {
-    return reader.isOpen();
-  }
-    
-  @Override
-  public int read(ByteBuffer dst) throws IOException {
-    return reader.doIO(dst, SelectionKey.OP_READ);
-  }
-  
-  /**
-   * waits for the underlying channel to be ready for reading.
-   * The timeout specified for this stream applies to this wait.
-   * 
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  public void waitForReadable() throws IOException {
-    reader.waitForIO(SelectionKey.OP_READ);
-  }
-
-  public void setTimeout(long timeoutMs) {
-    reader.setTimeout(timeoutMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
deleted file mode 100644
index 0bc0ecd..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Removed Preconditions and classifications
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.FilterInputStream;
-
-import java.io.InputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.channels.ReadableByteChannel;
-
-//import org.apache.hadoop.classification.InterfaceAudience;
-//import org.apache.hadoop.classification.InterfaceStability;
-
-//import com.google.common.base.Preconditions;
-
-/**
- * A wrapper stream around a socket which allows setting of its timeout. If the
- * socket has a channel, this uses non-blocking IO via the package-private
- * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by
- * setting the underlying socket timeout itself.
- */
-/*
-@InterfaceAudience.LimitedPrivate("HDFS")
-@InterfaceStability.Unstable
-*/
-public class SocketInputWrapper extends FilterInputStream {
-  private final Socket socket;
-  private final boolean hasChannel;
-
-  SocketInputWrapper(Socket s, InputStream is) {
-    super(is);
-    this.socket = s;
-    this.hasChannel = s.getChannel() != null;
-//    if (hasChannel) {
-//      Preconditions.checkArgument(is instanceof SocketInputStream,
-//          "Expected a SocketInputStream when there is a channel. " +
-//          "Got: %s", is);
-//    }
-  }
-
-  /**
-   * Set the timeout for reads from this stream.
-   * 
-   * Note: the behavior here can differ subtly depending on whether the
-   * underlying socket has an associated Channel. In particular, if there is no
-   * channel, then this call will affect the socket timeout for <em>all</em>
-   * readers of this socket. If there is a channel, then this call will affect
-   * the timeout only for <em>this</em> stream. As such, it is recommended to
-   * only create one {@link SocketInputWrapper} instance per socket.
-   * 
-   * @param timeoutMs
-   *          the new timeout, 0 for no timeout
-   * @throws SocketException
-   *           if the timeout cannot be set
-   */
-  public void setTimeout(long timeoutMs) throws SocketException {
-    if (hasChannel) {
-      ((SocketInputStream)in).setTimeout(timeoutMs);
-    } else {
-      socket.setSoTimeout((int)timeoutMs);
-    }
-  }
-
-  /**
-   * @return an underlying ReadableByteChannel implementation.
-   * @throws IllegalStateException if this socket does not have a channel
-   */
-  public ReadableByteChannel getReadableByteChannel() {
-//    Preconditions.checkState(hasChannel,
-//        "Socket %s does not have a channel",
-//        this.socket);
-    return (SocketInputStream)in;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
deleted file mode 100644
index 412635c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Removed classifications
- * Removed method transferToFully
- * 
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * This implements an output stream that can have a timeout while writing.
- * This sets non-blocking flag on the socket channel.
- * So after creating this object , read() on 
- * {@link Socket#getInputStream()} and write() on 
- * {@link Socket#getOutputStream()} on the associated socket will throw 
- * llegalBlockingModeException.
- * Please use {@link SocketInputStream} for reading.
- */
-public class SocketOutputStream extends OutputStream 
-                                implements WritableByteChannel {                                
-  
-  private Writer writer;
-  
-  private static class Writer extends SocketIOWithTimeout {
-    WritableByteChannel channel;
-    
-    Writer(WritableByteChannel channel, long timeout) throws IOException {
-      super((SelectableChannel)channel, timeout);
-      this.channel = channel;
-    }
-    
-    @Override
-    int performIO(ByteBuffer buf) throws IOException {
-      return channel.write(buf);
-    }
-  }
-  
-  /**
-   * Create a new ouput stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @param channel 
-   *        Channel for writing, should also be a {@link SelectableChannel}.  
-   *        The channel will be configured to be non-blocking.
-   * @param timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketOutputStream(WritableByteChannel channel, long timeout) 
-                                                         throws IOException {
-    SocketIOWithTimeout.checkChannelValidity(channel);
-    writer = new Writer(channel, timeout);
-  }
-  
-  /**
-   * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
-   * 
-   * Create a new ouput stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @param timeout timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketOutputStream(Socket socket, long timeout) 
-                                         throws IOException {
-    this(socket.getChannel(), timeout);
-  }
-  
-  @Override
-  public void write(int b) throws IOException {
-    /* If we need to, we can optimize this allocation.
-     * probably no need to optimize or encourage single byte writes.
-     */
-    byte[] buf = new byte[1];
-    buf[0] = (byte)b;
-    write(buf, 0, 1);
-  }
-  
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
-    while (buf.hasRemaining()) {
-      try {
-        if (write(buf) < 0) {
-          throw new IOException("The stream is closed");
-        }
-      } catch (IOException e) {
-        /* Unlike read, write can not inform user of partial writes.
-         * So will close this if there was a partial write.
-         */
-        if (buf.capacity() > buf.remaining()) {
-          writer.close();
-        }
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    /* close the channel since Socket.getOuputStream().close() 
-     * closes the socket.
-     */
-    writer.channel.close();
-    writer.close();
-  }
-
-  /**
-   * Returns underlying channel used by this stream.
-   * This is useful in certain cases like channel for 
-   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
-   */
-  public WritableByteChannel getChannel() {
-    return writer.channel; 
-  }
-
-  //WritableByteChannle interface 
-  
-  @Override
-  public boolean isOpen() {
-    return writer.isOpen();
-  }
-
-  @Override
-  public int write(ByteBuffer src) throws IOException {
-    return writer.doIO(src, SelectionKey.OP_WRITE);
-  }
-  
-  /**
-   * waits for the underlying channel to be ready for writing.
-   * The timeout specified for this stream applies to this wait.
-   *
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  public void waitForWritable() throws IOException {
-    writer.waitForIO(SelectionKey.OP_WRITE);
-  }
-
-  public void setTimeout(int timeoutMs) {
-    writer.setTimeout(timeoutMs);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
deleted file mode 100644
index 36eaf04..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Flag to enable/disable selector pooling for test purposes
- * 
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.SocketChannel;
-
-public class SocketUtils {
-
-  //used for testing
-  public static boolean USE_SELECTOR_POOLING = Boolean.valueOf(System.getProperty("gemfire.useSelectorPooling", "true")).booleanValue();
-  /**
-   * This is a drop-in replacement for 
-   * {@link Socket#connect(SocketAddress, int)}.
-   * In the case of normal sockets that don't have associated channels, this 
-   * just invokes <code>socket.connect(endpoint, timeout)</code>. If 
-   * <code>socket.getChannel()</code> returns a non-null channel,
-   * connect is implemented using Hadoop's selectors. This is done mainly
-   * to avoid Sun's connect implementation from creating thread-local 
-   * selectors, since Hadoop does not have control on when these are closed
-   * and could end up taking all the available file descriptors.
-   * 
-   * @see java.net.Socket#connect(java.net.SocketAddress, int)
-   * 
-   * @param socket
-   * @param address the remote address
-   * @param timeout timeout in milliseconds
-   */
-  public static void connect(Socket socket,
-      SocketAddress address,
-      int timeout) throws IOException {
-    connect(socket, address, null, timeout);
-  }
-  
-  /**
-   * Like SocketUtils.connect(Socket, SocketAddress, int) but
-   * also takes a local address and port to bind the socket to. 
-   * 
-   * @param socket
-   * @param endpoint the remote address
-   * @param localAddr the local address to bind the socket to
-   * @param timeout timeout in milliseconds
-   */
-  public static void connect(Socket socket, 
-                             SocketAddress endpoint,
-                             SocketAddress localAddr,
-                             int timeout) throws IOException {
-    if (socket == null || endpoint == null || timeout < 0) {
-      throw new IllegalArgumentException("Illegal argument for connect()");
-    }
-    SocketChannel ch = socket.getChannel();
-    
-    if (localAddr != null) {
-      socket.bind(localAddr);
-    }
-
-    try {
-      if (ch == null) {
-        // let the default implementation handle it.
-        socket.connect(endpoint, timeout);
-      } else {
-        if (USE_SELECTOR_POOLING) {
-          SocketIOWithTimeout.connect(ch, endpoint, timeout);
-        }
-        else {
-          socket.connect(endpoint, timeout);
-        }
-
-      }
-    } catch (SocketTimeoutException ste) {
-      throw new IOException(ste.getMessage());
-    }
-
-    /*
-     Pivotal Change: due to ticket #50734
-    // There is a very rare case allowed by the TCP specification, such that
-    // if we are trying to connect to an endpoint on the local machine,
-    // and we end up choosing an ephemeral port equal to the destination port,
-    // we will actually end up getting connected to ourself (ie any data we
-    // send just comes right back). This is only possible if the target
-    // daemon is down, so we'll treat it like connection refused.
-    if (socket.getLocalPort() == socket.getPort() &&
-        socket.getLocalAddress().equals(socket.getInetAddress())) {
-      socket.close();
-      throw new ConnectException(
-        "Localhost targeted connection resulted in a loopback. " +
-        "No daemon is listening on the target port.");
-    }
-    */
-  }
-  
-  /**
-   * Same as <code>getInputStream(socket, socket.getSoTimeout()).</code>
-   * <br><br>
-   * 
-   * @see #getInputStream(Socket, long)
-   */
-  public static InputStream getInputStream(Socket socket) 
-                                           throws IOException {
-    return getInputStream(socket, socket.getSoTimeout());
-  }
-
-  /**
-   * Return a {@link SocketInputWrapper} for the socket and set the given
-   * timeout. If the socket does not have an associated channel, then its socket
-   * timeout will be set to the specified value. Otherwise, a
-   * {@link SocketInputStream} will be created which reads with the configured
-   * timeout.
-   * 
-   * Any socket created using socket factories returned by {@link #SocketUtils},
-   * must use this interface instead of {@link Socket#getInputStream()}.
-   * 
-   * In general, this should be called only once on each socket: see the note
-   * in {@link SocketInputWrapper#setTimeout(long)} for more information.
-   *
-   * @see Socket#getChannel()
-   * 
-   * @param socket
-   * @param timeout timeout in milliseconds. zero for waiting as
-   *                long as necessary.
-   * @return SocketInputWrapper for reading from the socket.
-   * @throws IOException
-   */
-  /*Pivotal Addition
-   * Return type changed to InputStream instead of SocketInputWrapper
-   * Returning the regular inputstream if a channel is not present and does
-   * not wrap that around an input wrapper
-   */
-  public static InputStream getInputStream(Socket socket, long timeout) 
-                                           throws IOException {
-    if (socket.getChannel() == null || ! USE_SELECTOR_POOLING) {
-      return socket.getInputStream();
-    }
-    else {
-      SocketInputWrapper w = new SocketInputWrapper(socket, new SocketInputStream(socket));
-      w.setTimeout(timeout);
-      return w;
-    }
-  }
-  
-  /**
-   * Same as getOutputStream(socket, 0). Timeout of zero implies write will
-   * wait until data is available.<br><br>
-   * 
-   * From documentation for {@link #getOutputStream(Socket, long)} : <br>
-   * Returns OutputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketOutputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the write will wait until 
-   * data is available.<br><br>
-   * 
-   * Any socket created using socket factories returned by {@link SocketUtils},
-   * must use this interface instead of {@link Socket#getOutputStream()}.
-   * 
-   * @see #getOutputStream(Socket, long)
-   * 
-   * @param socket
-   * @return OutputStream for writing to the socket.
-   * @throws IOException
-   */  
-  public static OutputStream getOutputStream(Socket socket) 
-                                             throws IOException {
-    return getOutputStream(socket, 0);
-  }
-  
-  /**
-   * Returns OutputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketOutputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the write will wait until 
-   * data is available.<br><br>
-   * 
-   * Any socket created using socket factories returned by {@link SocketUtils},
-   * must use this interface instead of {@link Socket#getOutputStream()}.
-   * 
-   * @see Socket#getChannel()
-   * 
-   * @param socket
-   * @param timeout timeout in milliseconds. This may not always apply. zero
-   *        for waiting as long as necessary.
-   * @return OutputStream for writing to the socket.
-   * @throws IOException   
-   */
-  public static OutputStream getOutputStream(Socket socket, long timeout) 
-                                             throws IOException {
-    if (socket.getChannel() == null || !USE_SELECTOR_POOLING) {
-      return socket.getOutputStream();      
-    }
-    else {
-      return new SocketOutputStream(socket, timeout); 
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 05bc838..cc9727b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,6 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ClassPathLoader;
 import com.gemstone.gemfire.internal.JarDeployer;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketIOWithTimeout;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
@@ -2072,9 +2071,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           // ignore
         }
 
-          GatewaySenderAdvisor advisor = null;
-          for (GatewaySender sender : this.getAllGatewaySenders()) {
-            try {
+        GatewaySenderAdvisor advisor = null;
+        for (GatewaySender sender : this.getAllGatewaySenders()) {
+          try {
             sender.stop();
             advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
             if (advisor != null) {
@@ -2083,10 +2082,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
               }
               advisor.close();
             }
-            } catch (CancelException ce) {
-            }
+          } catch (CancelException ce) {
           }
-          ParallelGatewaySenderQueue.cleanUpStatics(null);
+        }
+        ParallelGatewaySenderQueue.cleanUpStatics(null);
 
         destroyGatewaySenderLockService();
 
@@ -2350,7 +2349,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       SequenceLoggerImpl.signalCacheClose();
       SystemFailure.signalCacheClose();
       
-      SocketIOWithTimeout.stopSelectorCleanUpThread();
     } // static synchronization on GemFireCache.class
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
index 5b20e86..eeb611e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
@@ -74,7 +74,6 @@ import com.gemstone.gemfire.distributed.internal.LonerDistributionManager;
 import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
 import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.cache.BucketAdvisor;
 import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
@@ -1483,7 +1482,7 @@ public class AcceptorImpl extends Acceptor implements Runnable
       }
     } else {
       s.setSoTimeout(this.acceptTimeout);
-      communicationMode = (byte)SocketUtils.getInputStream(s).read();//getInputStream().read();
+      communicationMode = (byte)s.getInputStream().read();
       if (logger.isTraceEnabled()) {
         logger.trace("read communications mode(2) ", communicationMode);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 3178b8d..1ba2294 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -17,101 +17,23 @@
 
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.Instantiator;
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheEvent;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.InterestRegistrationEvent;
-import com.gemstone.gemfire.cache.InterestRegistrationListener;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.RegionExistsException;
-import com.gemstone.gemfire.cache.UnsupportedVersionException;
+import com.gemstone.gemfire.*;
+import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
 import com.gemstone.gemfire.cache.query.CqException;
 import com.gemstone.gemfire.cache.query.Query;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
-import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DM;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.MessageWithReply;
-import com.gemstone.gemfire.distributed.internal.ReplyMessage;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.internal.ClassLoadUtil;
-import com.gemstone.gemfire.internal.DummyStatisticsFactory;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.InternalInstantiator;
-import com.gemstone.gemfire.internal.SocketCloser;
-import com.gemstone.gemfire.internal.SocketUtils;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.VersionedDataInputStream;
-import com.gemstone.gemfire.internal.VersionedDataOutputStream;
-import com.gemstone.gemfire.internal.cache.ClientServerObserver;
-import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
-import com.gemstone.gemfire.internal.cache.ClientRegionEventImpl;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.CacheClientStatus;
-import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor;
-import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.cache.Conflatable;
-import com.gemstone.gemfire.internal.cache.DistributedRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.FilterProfile;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl;
+import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.internal.*;
+import com.gemstone.gemfire.internal.cache.*;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalCacheEvent;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.RegionEventImpl;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerMap;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
-import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
-import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.ha.*;
 import com.gemstone.gemfire.internal.cache.tier.Acceptor;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -122,6 +44,15 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.security.AccessControl;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
 import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.*;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Class <code>CacheClientNotifier</code> works on the server and manages
@@ -314,8 +245,8 @@ public class CacheClientNotifier {
   {
     // Since no remote ports were specified in the message, wait for them.
     long startTime = this._statistics.startTime();
-    DataInputStream dis = new DataInputStream(SocketUtils.getInputStream(socket));//socket.getInputStream());
-    DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(socket));//socket.getOutputStream());
+    DataInputStream dis = new DataInputStream(socket.getInputStream());
+    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 
     // Read the client version
     short clientVersionOrdinal = Version.readOrdinal(dis);
@@ -607,7 +538,7 @@ public class CacheClientNotifier {
     // is attempted to be registered or authentication fails.
     try {
       DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(
-          SocketUtils.getOutputStream(socket)));//socket.getOutputStream()));
+          socket.getOutputStream()));
       // write the message type, message length and the error message (if any)
       writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
index 3e43b69..8968f62 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -71,7 +71,6 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.ClientServerObserver;
@@ -328,8 +327,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater,
 
       // set the timeout for the handshake
       mySock.setSoTimeout(handshakeTimeout);
-      tmpOut = SocketUtils.getOutputStream(mySock);
-      tmpIn = SocketUtils.getInputStream(mySock);
+      tmpOut = mySock.getOutputStream();
+      tmpIn = mySock.getInputStream();
 
       if (isDebugEnabled) {
         logger.debug("Initialized server-to-client socket with send buffer size: {} bytes and receive buffer size: {} bytes", mySock.getSendBufferSize(), mySock.getReceiveBufferSize());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
old mode 100644
new mode 100755
index 2ea6ca0..909b133
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
@@ -80,7 +80,6 @@ import com.gemstone.gemfire.internal.ClassLoadUtil;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.VersionedDataInputStream;
 import com.gemstone.gemfire.internal.VersionedDataOutputStream;
@@ -257,7 +256,7 @@ public class HandShake implements ClientHandShake
       try {
         soTimeout = sock.getSoTimeout();
         sock.setSoTimeout(timeout);
-        InputStream is = SocketUtils.getInputStream(sock);//sock.getInputStream();
+        InputStream is = sock.getInputStream();
         int valRead =  is.read();
         //this.code =  (byte)is.read();
         if (valRead == -1) {
@@ -269,7 +268,7 @@ public class HandShake implements ClientHandShake
         }
         try {
           DataInputStream dis = new DataInputStream(is);
-          DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
+          DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
           this.clientReadTimeout = dis.readInt();
           if (clientVersion.compareTo(Version.CURRENT) < 0) {
             // versioned streams allow object serialization code to deal with older clients
@@ -1272,8 +1271,8 @@ public class HandShake implements ClientHandShake
     try {
       ServerQueueStatus serverQStatus = null;
       Socket sock = conn.getSocket();
-      DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
-      final InputStream in = SocketUtils.getInputStream(sock);//sock.getInputStream();
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
       DistributedMember member = getDistributedMember(sock);
       // if running in a loner system, use the new port number in the ID to 
@@ -1378,8 +1377,8 @@ public class HandShake implements ClientHandShake
       AuthenticationFailedException, ServerRefusedConnectionException, ClassNotFoundException {
     ServerQueueStatus sqs = null;
     try {
-      DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
-      final InputStream in = SocketUtils.getInputStream(sock);//sock.getInputStream());
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
       DistributedMember member = getDistributedMember(sock);
       if (!this.multiuserSecureMode) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index bfe382c..94b4953 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -32,7 +32,6 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.SerializationException;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
@@ -1036,7 +1035,7 @@ public class Message  {
   public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
     this.sockCh = socket.getChannel();
     if (this.sockCh == null) {
-      setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats);
+      setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats);
     } else {
       setComms(socket, null, null,  bb, msgStats);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
old mode 100644
new mode 100755
index 1dd2562..e608db3
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
@@ -27,7 +27,6 @@ import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.security.Principal;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
@@ -42,13 +41,11 @@ import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.client.internal.AbstractOp;
 import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
@@ -630,7 +627,7 @@ public class ServerConnection implements Runnable {
   private boolean acceptHandShake(byte epType, int qSize)
   {
     try {
-      this.handshake.accept(SocketUtils.getOutputStream(theSocket), SocketUtils.getInputStream(this.theSocket)//this.theSocket
+      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream()
           , epType, qSize, this.communicationMode,
           this.principal);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
index 07185b8..867f397 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
@@ -17,20 +17,6 @@
 
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.security.Principal;
-import java.util.Properties;
-
-import org.apache.logging.log4j.Logger;
-
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.IncompatibleVersionException;
 import com.gemstone.gemfire.cache.UnsupportedVersionException;
@@ -40,7 +26,6 @@ import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.VersionedDataStream;
 import com.gemstone.gemfire.internal.cache.tier.Acceptor;
@@ -52,6 +37,15 @@ import com.gemstone.gemfire.internal.security.AuthorizeRequest;
 import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
 import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.security.Principal;
+import java.util.Properties;
 
 /**
  * A <code>ServerHandShakeProcessor</code> verifies the client's version compatibility with server.
@@ -422,7 +416,7 @@ public class ServerHandShakeProcessor {
     try {
       soTimeout = socket.getSoTimeout();
       socket.setSoTimeout(timeout);
-      InputStream is = SocketUtils.getInputStream(socket);//socket.getInputStream();
+      InputStream is = socket.getInputStream();
       short clientVersionOrdinal = Version.readOrdinalFromInputStream(is);
       if (clientVersionOrdinal == -1) {
         throw new EOFException(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
old mode 100644
new mode 100755
index d9dd826..f0fc27a
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
@@ -169,7 +169,7 @@ public abstract class NativeCalls {
       throw new UnsupportedOperationException(ex);
     }
 
-    // first try using SocketInputStream
+    // first try using FileInputStream
     if (sockStream instanceof FileInputStream) {
       try {
         fd = ((FileInputStream)sockStream).getFD();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
old mode 100644
new mode 100755
index bf44cd3..6e949b2
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -74,7 +74,6 @@ import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.Version;
@@ -1279,7 +1278,7 @@ public class Connection implements Runnable {
         int connectTime = getP2PConnectTimeout();; 
 
         try {
-          SocketUtils.connect(channel.socket(), addr, connectTime);
+          channel.socket().connect(addr, connectTime);
         } catch (NullPointerException e) {
           // bug #45044 - jdk 1.7 sometimes throws an NPE here
           ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
@@ -1330,7 +1329,7 @@ public class Connection implements Runnable {
         s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
         setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
         setSendBufferSize(s);
-        SocketUtils.connect(s, addr, 0);
+        s.connect(addr, 0);
       }
     }
     if (logger.isDebugEnabled()) {


[04/18] incubator-geode git commit: GEODE-1221 Move no longer supported use case to test tree.

Posted by kl...@apache.org.
GEODE-1221 Move no longer supported use case to test tree.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6c033cfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6c033cfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6c033cfe

Branch: refs/heads/feature/GEODE-1162
Commit: 6c033cfe75de2e42d97bacf7cae122fc1adc8600
Parents: 90ab09c
Author: eshu <es...@pivotal.io>
Authored: Wed Apr 13 10:02:58 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Wed Apr 13 10:02:58 2016 -0700

----------------------------------------------------------------------
 .../cache/execute/util/CommitFunction.java      | 141 ------------------
 .../execute/util/NestedTransactionFunction.java | 116 ---------------
 .../cache/execute/util/RollbackFunction.java    | 136 ------------------
 .../cache/ClientServerTransactionDUnitTest.java |   2 -
 .../gemfire/internal/cache/CommitFunction.java  | 142 +++++++++++++++++++
 .../cache/NestedTransactionFunction.java        | 116 +++++++++++++++
 .../internal/cache/RollbackFunction.java        | 137 ++++++++++++++++++
 7 files changed, 395 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/CommitFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/CommitFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/CommitFunction.java
deleted file mode 100644
index 23139b4..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/CommitFunction.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.execute.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.execute.Execution;
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
-import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.internal.cache.TXId;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * This function can be used by GemFire clients and peers to commit an existing
- * transaction. A {@link TransactionId} corresponding to the transaction to be
- * committed must be provided as an argument while invoking this function.<br />
- * 
- * This function should execute only on one server. If the transaction is not
- * hosted on the server where the function is invoked then this function decides
- * to invoke a nested {@link NestedTransactionFunction} which executes on the member where
- * transaction is hosted.<br />
- * 
- * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
- * if the transaction committed successfully otherwise the return value is
- * <code>Boolean.FALSE</code>.<br />
- * 
- * To execute this function, it is recommended to use the {@link Execution} obtained by
- * using TransactionFunctionService. <br />
- * 
- * To summarize, this function should be used as follows:
- * 
- * <pre>
- * Execution exe = TransactionFunctionService.onTransaction(txId);
- * List l = (List) exe.execute(commitFunction).getResult();
- * Boolean result = (Boolean) l.get(0);
- * </pre>
- * 
- * This function is <b>not</b> registered on the cache servers by default, and
- * it is the user's responsibility to register this function. see
- * {@link FunctionService#registerFunction(Function)}
- * 
- * @since 6.6.1
- */
-public class CommitFunction implements Function {
-  private static final Logger logger = LogService.getLogger();
-
-  private static final long serialVersionUID = 7851518767859544501L;
-
-  public boolean hasResult() {
-    return true;
-  }
-
-  public void execute(FunctionContext context) {
-    Cache cache = CacheFactory.getAnyInstance();
-    TXId txId = null;
-    try {
-      txId = (TXId) context.getArguments();
-    } catch (ClassCastException e) {
-      logger.info("CommitFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
-      throw e;
-    }
-    DistributedMember member = txId.getMemberId();
-    Boolean result = false;
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (cache.getDistributedSystem().getDistributedMember().equals(member)) {
-      if (isDebugEnabled) {
-        logger.debug("CommitFunction: for transaction: {} committing locally", txId);
-      }
-      CacheTransactionManager txMgr = cache.getCacheTransactionManager();
-      if (txMgr.tryResume(txId)) {
-        if (isDebugEnabled) {
-          logger.debug("CommitFunction: resumed transaction: {}", txId);
-        }
-        txMgr.commit();
-        result = true;
-      }
-    } else {
-      ArrayList args = new ArrayList();
-      args.add(txId);
-      args.add(NestedTransactionFunction.COMMIT);
-      Execution ex = FunctionService.onMember(cache.getDistributedSystem(),
-          member).withArgs(args);
-      if (isDebugEnabled) {
-        logger.debug("CommitFunction: for transaction: {} executing NestedTransactionFunction on member: {}", txId, member);
-      }
-      try {
-        List list = (List) ex.execute(new NestedTransactionFunction()).getResult();
-        result = (Boolean) list.get(0);
-      } catch (FunctionException fe) {
-        if (fe.getCause() instanceof FunctionInvocationTargetException) {
-          throw new TransactionDataNodeHasDepartedException("Could not commit on member:"+member);
-        } else {
-          throw fe;
-        }
-      }
-    }
-    if (isDebugEnabled) {
-      logger.debug("CommitFunction: for transaction: {} returning result: {}", txId, result);
-    }
-    context.getResultSender().lastResult(result);
-  }
-
-  public String getId() {
-    return getClass().getName();
-  }
-
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  public boolean isHA() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/NestedTransactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/NestedTransactionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/NestedTransactionFunction.java
deleted file mode 100644
index 9e4aa9d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/NestedTransactionFunction.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.execute.util;
-
-import java.util.ArrayList;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.internal.cache.TXId;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * This function is used by {@link CommitFunction} to commit existing transaction.
- * A {@link TransactionId} corresponding to the transaction to be
- * committed must be provided as an argument while invoking this function.<br />
- * 
- * When executed this function commits a transaction if it exists locally.<br />
- * 
- * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
- * if the transaction committed successfully otherwise the return value is
- * <code>Boolean.FALSE</code><br />
- * 
- * This function is <b>not</b> registered on the cache servers by default, and
- * it is the user's responsibility to register this function. see
- * {@link FunctionService#registerFunction(Function)}
- * 
- * @see CommitFunction
- * @since 6.6.1
- *
- */
-public class NestedTransactionFunction implements Function {
-  private static final Logger logger = LogService.getLogger();
-
-  public static final int COMMIT = 1;
-  public static final int ROLLBACK = 2;
-  
-  private static final long serialVersionUID = 1400965724856341543L;
-
-  public boolean hasResult() {
-    return true;
-  }
-
-  public void execute(FunctionContext context) {
-    Cache cache = CacheFactory.getAnyInstance();
-    ArrayList args = (ArrayList) context.getArguments();
-    TXId txId = null;
-    int action = 0;
-    try {
-      txId = (TXId) args.get(0);
-      action = (Integer) args.get(1);
-    } catch (ClassCastException e) {
-      logger.info("CommitFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
-      throw e;
-    }
-    CacheTransactionManager txMgr = cache.getCacheTransactionManager();
-    Boolean result = false;
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (txMgr.tryResume(txId)) {
-      if (isDebugEnabled) {
-        logger.debug("CommitFunction: resumed transaction: {}", txId);
-      }
-      if (action == COMMIT) {
-        if (isDebugEnabled) {
-          logger.debug("CommitFunction: committing transaction: {}", txId);
-        }
-        txMgr.commit();
-      } else if (action == ROLLBACK) {
-        if (isDebugEnabled) {
-          logger.debug("CommitFunction: rolling back transaction: {}", txId);
-        }
-        txMgr.rollback();
-      } else {
-        throw new IllegalStateException("unknown transaction termination action");
-      }
-      result = true;
-    }
-    if (isDebugEnabled) {
-      logger.debug("CommitFunction: for transaction: {} sending result: {}", txId, result);
-    }
-    context.getResultSender().lastResult(result);
-  }
-
-  public String getId() {
-    return getClass().getName();
-  }
-
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  public boolean isHA() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/RollbackFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/RollbackFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/RollbackFunction.java
deleted file mode 100644
index f413d6c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/util/RollbackFunction.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.execute.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.execute.Execution;
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.internal.cache.TXId;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * This function can be used by GemFire clients and peers to rollback an existing
- * transaction. A {@link TransactionId} corresponding to the transaction to be
- * rolledback must be provided as an argument while invoking this function.<br />
- * 
- * This function should execute only on one server. If the transaction is not
- * hosted on the server where the function is invoked then this function decides
- * to invoke a {@link NestedTransactionFunction} which executes on the member where
- * transaction is hosted.<br />
- * 
- * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
- * if the transaction rolled back successfully otherwise the return value is
- * <code>Boolean.FALSE</code>.<br />
- * 
- * To execute this function, it is recommended to use the {@link Execution} obtained by
- * using TransactionFunctionService. <br />
- * 
- * To summarize, this function should be used as follows:
- * 
- * <pre>
- * Execution exe = TransactionFunctionService.onTransaction(txId);
- * List l = (List) exe.execute(rollbackFunction).getResult();
- * Boolean result = (Boolean) l.get(0);
- * </pre>
- * 
- * This function is <b>not</b> registered on the cache servers by default, and
- * it is the user's responsibility to register this function. see
- * {@link FunctionService#registerFunction(Function)}
- * 
- * @since 6.6.1
- */
-public class RollbackFunction implements Function {
-  private static final Logger logger = LogService.getLogger();
-
-  private static final long serialVersionUID = 1377183180063184795L;
-
-  public boolean hasResult() {
-    return true;
-  }
-
-  public void execute(FunctionContext context) {
-    Cache cache = CacheFactory.getAnyInstance();
-    TXId txId = null;
-    try {
-      txId = (TXId) context.getArguments();
-    } catch (ClassCastException e) {
-      logger.info("RollbackFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
-      throw e;
-    }
-    DistributedMember member = txId.getMemberId();
-    Boolean result = false;
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (cache.getDistributedSystem().getDistributedMember().equals(member)) {
-      if (isDebugEnabled) {
-        logger.debug("RollbackFunction: for transaction: {} rolling back locally", txId);
-      }
-      CacheTransactionManager txMgr = cache.getCacheTransactionManager();
-      if (txMgr.tryResume(txId)) {
-        if (isDebugEnabled) {
-          logger.debug("RollbackFunction: resumed transaction: {}", txId);
-        }
-        txMgr.rollback();
-        result = true;
-      }
-    } else {
-      ArrayList args = new ArrayList();
-      args.add(txId);
-      args.add(NestedTransactionFunction.ROLLBACK);
-      Execution ex = FunctionService.onMember(cache.getDistributedSystem(),
-          member).withArgs(args);
-      if (isDebugEnabled) {
-        logger.debug("RollbackFunction: for transaction: {} executing NestedTransactionFunction on member: {}", txId, member);
-      }
-      try {
-        List list = (List) ex.execute(new NestedTransactionFunction()).getResult();
-        result = (Boolean) list.get(0);
-      } catch (FunctionException fe) {
-        throw new TransactionDataNodeHasDepartedException("Could not Rollback on member:"+member);
-      }
-    }
-    if (isDebugEnabled) {
-      logger.debug("RollbackFunction: for transaction: {} returning result: {}", txId, result);
-    }
-    context.getResultSender().lastResult(result);
-  }
-
-  public String getId() {
-    return getClass().getName();
-  }
-
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  public boolean isHA() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
index 926a50a..672c5e6 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
@@ -85,8 +85,6 @@ import com.gemstone.gemfire.internal.cache.execute.data.CustId;
 import com.gemstone.gemfire.internal.cache.execute.data.Customer;
 import com.gemstone.gemfire.internal.cache.execute.data.Order;
 import com.gemstone.gemfire.internal.cache.execute.data.OrderId;
-import com.gemstone.gemfire.internal.cache.execute.util.CommitFunction;
-import com.gemstone.gemfire.internal.cache.execute.util.RollbackFunction;
 import com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.IgnoredException;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/CommitFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/CommitFunction.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/CommitFunction.java
new file mode 100644
index 0000000..051465d
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/CommitFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This function can be used by GemFire clients and peers to commit an existing
+ * transaction. A {@link TransactionId} corresponding to the transaction to be
+ * committed must be provided as an argument while invoking this function.<br />
+ * 
+ * This function should execute only on one server. If the transaction is not
+ * hosted on the server where the function is invoked then this function decides
+ * to invoke a nested {@link NestedTransactionFunction} which executes on the member where
+ * transaction is hosted.<br />
+ * 
+ * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
+ * if the transaction committed successfully otherwise the return value is
+ * <code>Boolean.FALSE</code>.<br />
+ * 
+ * To execute this function, it is recommended to use the {@link Execution} obtained by
+ * using TransactionFunctionService. <br />
+ * 
+ * To summarize, this function should be used as follows:
+ * 
+ * <pre>
+ * Execution exe = TransactionFunctionService.onTransaction(txId);
+ * List l = (List) exe.execute(commitFunction).getResult();
+ * Boolean result = (Boolean) l.get(0);
+ * </pre>
+ * 
+ * This function is <b>not</b> registered on the cache servers by default, and
+ * it is the user's responsibility to register this function. see
+ * {@link FunctionService#registerFunction(Function)}
+ * 
+ * @since 6.6.1
+ */
+public class CommitFunction implements Function {
+  private static final Logger logger = LogService.getLogger();
+
+  private static final long serialVersionUID = 7851518767859544501L;
+
+  public boolean hasResult() {
+    return true;
+  }
+
+  public void execute(FunctionContext context) {
+    Cache cache = CacheFactory.getAnyInstance();
+    TXId txId = null;
+    try {
+      txId = (TXId) context.getArguments();
+    } catch (ClassCastException e) {
+      logger.info("CommitFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
+      throw e;
+    }
+    DistributedMember member = txId.getMemberId();
+    Boolean result = false;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (cache.getDistributedSystem().getDistributedMember().equals(member)) {
+      if (isDebugEnabled) {
+        logger.debug("CommitFunction: for transaction: {} committing locally", txId);
+      }
+      CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+      if (txMgr.tryResume(txId)) {
+        if (isDebugEnabled) {
+          logger.debug("CommitFunction: resumed transaction: {}", txId);
+        }
+        txMgr.commit();
+        result = true;
+      }
+    } else {
+      ArrayList args = new ArrayList();
+      args.add(txId);
+      args.add(NestedTransactionFunction.COMMIT);
+      Execution ex = FunctionService.onMember(cache.getDistributedSystem(),
+          member).withArgs(args);
+      if (isDebugEnabled) {
+        logger.debug("CommitFunction: for transaction: {} executing NestedTransactionFunction on member: {}", txId, member);
+      }
+      try {
+        List list = (List) ex.execute(new NestedTransactionFunction()).getResult();
+        result = (Boolean) list.get(0);
+      } catch (FunctionException fe) {
+        if (fe.getCause() instanceof FunctionInvocationTargetException) {
+          throw new TransactionDataNodeHasDepartedException("Could not commit on member:"+member);
+        } else {
+          throw fe;
+        }
+      }
+    }
+    if (isDebugEnabled) {
+      logger.debug("CommitFunction: for transaction: {} returning result: {}", txId, result);
+    }
+    context.getResultSender().lastResult(result);
+  }
+
+  public String getId() {
+    return getClass().getName();
+  }
+
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  public boolean isHA() {
+    //GEM-207
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/NestedTransactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/NestedTransactionFunction.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/NestedTransactionFunction.java
new file mode 100644
index 0000000..edf259c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/NestedTransactionFunction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This function is used by {@link CommitFunction} to commit existing transaction.
+ * A {@link TransactionId} corresponding to the transaction to be
+ * committed must be provided as an argument while invoking this function.<br />
+ * 
+ * When executed this function commits a transaction if it exists locally.<br />
+ * 
+ * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
+ * if the transaction committed successfully otherwise the return value is
+ * <code>Boolean.FALSE</code><br />
+ * 
+ * This function is <b>not</b> registered on the cache servers by default, and
+ * it is the user's responsibility to register this function. see
+ * {@link FunctionService#registerFunction(Function)}
+ * 
+ * @see CommitFunction
+ * @since 6.6.1
+ *
+ */
+public class NestedTransactionFunction implements Function {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final int COMMIT = 1;
+  public static final int ROLLBACK = 2;
+  
+  private static final long serialVersionUID = 1400965724856341543L;
+
+  public boolean hasResult() {
+    return true;
+  }
+
+  public void execute(FunctionContext context) {
+    Cache cache = CacheFactory.getAnyInstance();
+    ArrayList args = (ArrayList) context.getArguments();
+    TXId txId = null;
+    int action = 0;
+    try {
+      txId = (TXId) args.get(0);
+      action = (Integer) args.get(1);
+    } catch (ClassCastException e) {
+      logger.info("CommitFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
+      throw e;
+    }
+    CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+    Boolean result = false;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (txMgr.tryResume(txId)) {
+      if (isDebugEnabled) {
+        logger.debug("CommitFunction: resumed transaction: {}", txId);
+      }
+      if (action == COMMIT) {
+        if (isDebugEnabled) {
+          logger.debug("CommitFunction: committing transaction: {}", txId);
+        }
+        txMgr.commit();
+      } else if (action == ROLLBACK) {
+        if (isDebugEnabled) {
+          logger.debug("CommitFunction: rolling back transaction: {}", txId);
+        }
+        txMgr.rollback();
+      } else {
+        throw new IllegalStateException("unknown transaction termination action");
+      }
+      result = true;
+    }
+    if (isDebugEnabled) {
+      logger.debug("CommitFunction: for transaction: {} sending result: {}", txId, result);
+    }
+    context.getResultSender().lastResult(result);
+  }
+
+  public String getId() {
+    return getClass().getName();
+  }
+
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  public boolean isHA() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c033cfe/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RollbackFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RollbackFunction.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RollbackFunction.java
new file mode 100644
index 0000000..7a319a2
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RollbackFunction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This function can be used by GemFire clients and peers to rollback an existing
+ * transaction. A {@link TransactionId} corresponding to the transaction to be
+ * rolledback must be provided as an argument while invoking this function.<br />
+ * 
+ * This function should execute only on one server. If the transaction is not
+ * hosted on the server where the function is invoked then this function decides
+ * to invoke a {@link NestedTransactionFunction} which executes on the member where
+ * transaction is hosted.<br />
+ * 
+ * This function returns a single Boolean as result, whose value is <code>Boolean.TRUE</code>
+ * if the transaction rolled back successfully otherwise the return value is
+ * <code>Boolean.FALSE</code>.<br />
+ * 
+ * To execute this function, it is recommended to use the {@link Execution} obtained by
+ * using TransactionFunctionService. <br />
+ * 
+ * To summarize, this function should be used as follows:
+ * 
+ * <pre>
+ * Execution exe = TransactionFunctionService.onTransaction(txId);
+ * List l = (List) exe.execute(rollbackFunction).getResult();
+ * Boolean result = (Boolean) l.get(0);
+ * </pre>
+ * 
+ * This function is <b>not</b> registered on the cache servers by default, and
+ * it is the user's responsibility to register this function. see
+ * {@link FunctionService#registerFunction(Function)}
+ * 
+ * @since 6.6.1
+ */
+public class RollbackFunction implements Function {
+  private static final Logger logger = LogService.getLogger();
+
+  private static final long serialVersionUID = 1377183180063184795L;
+
+  public boolean hasResult() {
+    return true;
+  }
+
+  public void execute(FunctionContext context) {
+    Cache cache = CacheFactory.getAnyInstance();
+    TXId txId = null;
+    try {
+      txId = (TXId) context.getArguments();
+    } catch (ClassCastException e) {
+      logger.info("RollbackFunction should be invoked with a TransactionId as an argument i.e. withArgs(txId).execute(function)");
+      throw e;
+    }
+    DistributedMember member = txId.getMemberId();
+    Boolean result = false;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (cache.getDistributedSystem().getDistributedMember().equals(member)) {
+      if (isDebugEnabled) {
+        logger.debug("RollbackFunction: for transaction: {} rolling back locally", txId);
+      }
+      CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+      if (txMgr.tryResume(txId)) {
+        if (isDebugEnabled) {
+          logger.debug("RollbackFunction: resumed transaction: {}", txId);
+        }
+        txMgr.rollback();
+        result = true;
+      }
+    } else {
+      ArrayList args = new ArrayList();
+      args.add(txId);
+      args.add(NestedTransactionFunction.ROLLBACK);
+      Execution ex = FunctionService.onMember(cache.getDistributedSystem(),
+          member).withArgs(args);
+      if (isDebugEnabled) {
+        logger.debug("RollbackFunction: for transaction: {} executing NestedTransactionFunction on member: {}", txId, member);
+      }
+      try {
+        List list = (List) ex.execute(new NestedTransactionFunction()).getResult();
+        result = (Boolean) list.get(0);
+      } catch (FunctionException fe) {
+        throw new TransactionDataNodeHasDepartedException("Could not Rollback on member:"+member);
+      }
+    }
+    if (isDebugEnabled) {
+      logger.debug("RollbackFunction: for transaction: {} returning result: {}", txId, result);
+    }
+    context.getResultSender().lastResult(result);
+  }
+
+  public String getId() {
+    return getClass().getName();
+  }
+
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  public boolean isHA() {
+    //GEM-207
+    return true;
+  }
+
+}


[06/18] incubator-geode git commit: GEODE-1217: Marking jansi as optional in the pom

Posted by kl...@apache.org.
GEODE-1217: Marking jansi as optional in the pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/69cd4a7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/69cd4a7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/69cd4a7e

Branch: refs/heads/feature/GEODE-1162
Commit: 69cd4a7e3f2d77d6d0790bf47ed82278b4d09d42
Parents: 6c033cf
Author: Dan Smith <up...@apache.org>
Authored: Tue Apr 12 13:38:20 2016 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Wed Apr 13 10:08:42 2016 -0700

----------------------------------------------------------------------
 geode-core/build.gradle | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/69cd4a7e/geode-core/build.gradle
----------------------------------------------------------------------
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 526f22a..8216d2a 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -88,7 +88,9 @@ dependencies {
 
   compile 'org.apache.logging.log4j:log4j-api:' + project.'log4j.version'
   compile 'org.apache.logging.log4j:log4j-core:' + project.'log4j.version'
-  runtime 'org.fusesource.jansi:jansi:' + project.'jansi.version'
+  runtime ('org.fusesource.jansi:jansi:' + project.'jansi.version') {
+    ext.optional = true
+  }
   runtime ('org.apache.logging.log4j:log4j-slf4j-impl:' + project.'log4j.version') {
     ext.optional = true
   }


[10/18] incubator-geode git commit: GEODE-1204 modified conditionals and opacity to make logo visible on Releases page.

Posted by kl...@apache.org.
GEODE-1204 modified conditionals and opacity to make logo visible on Releases page.

Closes #45971


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f473facb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f473facb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f473facb

Branch: refs/heads/feature/GEODE-1162
Commit: f473facb33fea633782968513b792f13817c54a9
Parents: 0f692f8
Author: Dave Barnes <db...@pivotal.io>
Authored: Fri Apr 8 16:34:22 2016 -0700
Committer: Dave Barnes <db...@pivotal.io>
Committed: Wed Apr 13 10:57:48 2016 -0700

----------------------------------------------------------------------
 geode-site/website/Rules                 | 6 ++++++
 geode-site/website/layouts/default.html  | 2 +-
 geode-site/website/layouts/header.html   | 2 +-
 geode-site/website/layouts/releases.html | 1 +
 4 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f473facb/geode-site/website/Rules
----------------------------------------------------------------------
diff --git a/geode-site/website/Rules b/geode-site/website/Rules
index 5369385..46aead3 100644
--- a/geode-site/website/Rules
+++ b/geode-site/website/Rules
@@ -44,6 +44,12 @@ compile '/community/*' do
   layout 'community'
 end
 
+compile '/releases/*' do
+  @releases = true
+  filter :erb
+  layout 'releases'
+end
+
 compile '*' do
   if item.binary?
     # don’t filter binary items

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f473facb/geode-site/website/layouts/default.html
----------------------------------------------------------------------
diff --git a/geode-site/website/layouts/default.html b/geode-site/website/layouts/default.html
index 2e3e2cd..2bdf9ef 100644
--- a/geode-site/website/layouts/default.html
+++ b/geode-site/website/layouts/default.html
@@ -1,7 +1,7 @@
 <!DOCTYPE html>
 <html lang="en">
 
-<%= render 'header', {:docs => @docs, :community => @community} %>
+<%= render 'header', {:docs => @docs, :community => @community, :releases => @releases} %>
 
 <%= @content %>
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f473facb/geode-site/website/layouts/header.html
----------------------------------------------------------------------
diff --git a/geode-site/website/layouts/header.html b/geode-site/website/layouts/header.html
index 6edaec8..b5a4ca7 100644
--- a/geode-site/website/layouts/header.html
+++ b/geode-site/website/layouts/header.html
@@ -230,7 +230,7 @@
 </head>
 <body>
 
-    <header class="navbar navbar-inverse navbar-fixed-top bf-docs-nav <%= 'secondary' if @docs or @community %>" role="banner">
+    <header class="navbar navbar-inverse navbar-fixed-top bf-docs-nav <%= 'secondary' if @docs or @community or @releases %>" role="banner">
     <div class="container">
         <div class="navbar-header">
             <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bf-navbar-collapse">

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f473facb/geode-site/website/layouts/releases.html
----------------------------------------------------------------------
diff --git a/geode-site/website/layouts/releases.html b/geode-site/website/layouts/releases.html
new file mode 100644
index 0000000..9240770
--- /dev/null
+++ b/geode-site/website/layouts/releases.html
@@ -0,0 +1 @@
+<%= render 'default', :releases => true, :content => @content, :item => @item %>


[13/18] incubator-geode git commit: GEODE-1225: Converted SSL log message from info to fine

Posted by kl...@apache.org.
GEODE-1225: Converted SSL log message from info to fine


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bbf705ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bbf705ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bbf705ed

Branch: refs/heads/feature/GEODE-1162
Commit: bbf705edc7cef804d3b84c1b40551630ae67a5fb
Parents: 7d5f39a
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Apr 13 14:10:18 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Apr 13 14:10:18 2016 -0700

----------------------------------------------------------------------
 .../java/com/gemstone/gemfire/internal/SocketCreator.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bbf705ed/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index 2130a26..acdfbc7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -1055,7 +1055,9 @@ public class SocketCreator {
             sslSocket.startHandshake();
             SSLSession session = sslSocket.getSession();
             Certificate[] peer = session.getPeerCertificates();
-            logger.info(LocalizedMessage.create(LocalizedStrings.SocketCreator_SSL_CONNECTION_FROM_PEER_0, ((X509Certificate)peer[0]).getSubjectDN()));
+            if (logger.isDebugEnabled()) {
+              logger.debug(LocalizedMessage.create(LocalizedStrings.SocketCreator_SSL_CONNECTION_FROM_PEER_0, ((X509Certificate)peer[0]).getSubjectDN()));
+            }
           }
           catch (SSLPeerUnverifiedException ex) {
             if (this.needClientAuth) {
@@ -1119,7 +1121,9 @@ public class SocketCreator {
         sslSocket.startHandshake();
         SSLSession session = sslSocket.getSession();
         Certificate[] peer = session.getPeerCertificates();
-        logger.info(LocalizedMessage.create(LocalizedStrings.SocketCreator_SSL_CONNECTION_FROM_PEER_0, ((X509Certificate)peer[0]).getSubjectDN()));
+        if (logger.isDebugEnabled()) {
+          logger.debug(LocalizedMessage.create(LocalizedStrings.SocketCreator_SSL_CONNECTION_FROM_PEER_0, ((X509Certificate)peer[0]).getSubjectDN()));
+        }
       }
       catch (SSLPeerUnverifiedException ex) {
         if (this.needClientAuth) {


[11/18] incubator-geode git commit: GEODE-1219: Updated lucene version to 6.0.0

Posted by kl...@apache.org.
GEODE-1219: Updated lucene version to 6.0.0

Modifications to code due to lucene api changes
Implemented temporary output stream in Directory and FileSystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6c7a0d2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6c7a0d2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6c7a0d2a

Branch: refs/heads/feature/GEODE-1162
Commit: 6c7a0d2a2d3c0c56828c9c0498390b505de00ea4
Parents: f473fac
Author: Jason Huynh <hu...@gmail.com>
Authored: Thu Apr 7 15:40:35 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Wed Apr 13 11:13:27 2016 -0700

----------------------------------------------------------------------
 .../internal/directory/RegionDirectory.java     | 20 ++++++++++++++++----
 .../lucene/internal/filesystem/FileSystem.java  |  5 +++++
 .../repository/IndexRepositoryImpl.java         |  2 +-
 .../repository/serializer/SerializerUtil.java   | 16 ++++++++--------
 .../IndexRepositoryImplPerformanceTest.java     |  4 ++--
 gradle/dependency-versions.properties           |  2 +-
 6 files changed, 33 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
index e25dc77..28117bd 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
@@ -21,7 +21,9 @@ package com.gemstone.gemfire.cache.lucene.internal.directory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.lucene.store.BaseDirectory;
@@ -45,7 +47,7 @@ import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystem;
 public class RegionDirectory extends BaseDirectory {
 
   private final FileSystem fs;
-  
+
   /**
    * Create a region directory with a given file and chunk region. These regions
    * may be bucket regions or they may be replicated regions.
@@ -54,11 +56,13 @@ public class RegionDirectory extends BaseDirectory {
     super(new SingleInstanceLockFactory());
     fs = new FileSystem(fileRegion, chunkRegion);
   }
-  
+
   @Override
   public String[] listAll() throws IOException {
     ensureOpen();
-    return fs.listFileNames().toArray(new String[] {});
+    String[] array = fs.listFileNames().toArray(new String[]{});
+    Arrays.sort(array);
+    return array;
   }
 
   @Override
@@ -79,7 +83,15 @@ public class RegionDirectory extends BaseDirectory {
     final File file = fs.createFile(name);
     final OutputStream out = file.getOutputStream();
 
-    return new OutputStreamIndexOutput(name, out, 1000);
+    return new OutputStreamIndexOutput(name, name, out, 1000);
+  }
+
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
+    String name = prefix + "_temp_" + UUID.randomUUID() + suffix;
+    final File file = fs.createTemporaryFile(name);
+    final OutputStream out = file.getOutputStream();
+
+    return new OutputStreamIndexOutput(name, name, out, 1000);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
index b84dc92..44513f1 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
@@ -60,6 +60,11 @@ public class FileSystem {
     // TODO unlock region ?
     return file;
   }
+
+  public File createTemporaryFile(final String name) throws IOException {
+    final File file = new File(this, name);
+    return file;
+  }
   
   public File getFile(final String name) throws FileNotFoundException {
     final File file = fileRegion.get(name);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index a9c463e..e589ef4 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -51,7 +51,7 @@ public class IndexRepositoryImpl implements IndexRepository {
   public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer) throws IOException {
     this.region = region;
     this.writer = writer;
-    searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, null);
+    searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, true, null);
     this.serializer = serializer;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/serializer/SerializerUtil.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/serializer/SerializerUtil.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/serializer/SerializerUtil.java
index 7ffc5db..0ed9d5d 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/serializer/SerializerUtil.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/serializer/SerializerUtil.java
@@ -24,11 +24,11 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoubleField;
+import org.apache.lucene.document.DoublePoint;
 import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.FloatField;
-import org.apache.lucene.document.IntField;
-import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexableField;
@@ -79,13 +79,13 @@ public class SerializerUtil {
     if(clazz == String.class) {
       doc.add(new TextField(field, (String)fieldValue, Store.NO));
     } else if (clazz == Long.class) {
-      doc.add(new LongField(field, (Long) fieldValue, Store.NO));
+      doc.add(new LongPoint(field, (Long) fieldValue));
     } else if (clazz == Integer.class) {
-      doc.add(new IntField(field, (Integer) fieldValue, Store.NO));
+      doc.add(new IntPoint(field, (Integer) fieldValue));
     } else if (clazz == Float.class) {
-      doc.add(new FloatField(field, (Float) fieldValue, Store.NO));
+      doc.add(new FloatPoint(field, (Float) fieldValue));
     }  else if (clazz == Double.class) {
-        doc.add(new DoubleField(field, (Double) fieldValue, Store.NO));
+        doc.add(new DoublePoint(field, (Double) fieldValue));
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
index 74f3742..e996f30 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
@@ -222,7 +222,7 @@ public class IndexRepositoryImplPerformanceTest {
         RegionDirectory dir = new RegionDirectory(new ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>());
         IndexWriterConfig config = new IndexWriterConfig(analyzer);
         writer = new IndexWriter(dir, config);
-        searcherManager = new SearcherManager(writer, true, null);
+        searcherManager = new SearcherManager(writer, true, true, null);
       }
 
       @Override
@@ -275,7 +275,7 @@ public class IndexRepositoryImplPerformanceTest {
         RAMDirectory dir = new RAMDirectory();
         IndexWriterConfig config = new IndexWriterConfig(analyzer);
         writer = new IndexWriter(dir, config);
-        searcherManager = new SearcherManager(writer, true, null);
+        searcherManager = new SearcherManager(writer, true, true, null);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6c7a0d2a/gradle/dependency-versions.properties
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.properties b/gradle/dependency-versions.properties
index 18e327e..f89f281 100644
--- a/gradle/dependency-versions.properties
+++ b/gradle/dependency-versions.properties
@@ -71,7 +71,7 @@ jsr305.version = 3.0.1
 junit.version = 4.12
 JUnitParams.version = 1.0.4
 log4j.version = 2.5
-lucene.version = 5.3.0
+lucene.version = 6.0.0
 mockito-core.version = 1.10.19
 mockrunner.version = 1.0.8
 multithreadedtc.version = 1.01


[09/18] incubator-geode git commit: GEODE-1216 Fix the scalability of remove member

Posted by kl...@apache.org.
GEODE-1216 Fix the scalability of remove member

Enable the optimization in sendSuspectRequest so that health monitor
sends suspect requests to appropriate recipients.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0f692f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0f692f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0f692f86

Branch: refs/heads/feature/GEODE-1162
Commit: 0f692f860c6db1f3368d474bfb0c6e8b916f833f
Parents: 9d20f22
Author: Jianxia Chen <jc...@pivotal.io>
Authored: Wed Apr 13 10:22:12 2016 -0700
Committer: Jianxia Chen <jc...@pivotal.io>
Committed: Wed Apr 13 10:22:12 2016 -0700

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java      | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f692f86/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 5427d77..2d0f039 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1331,18 +1331,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 //    }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
-//  TODO this needs some rethinking - we need the guys near the
-//  front of the membership view who aren't preferred for coordinator
-//  to see the suspect message.
-//    if (v.size() > 20) {
-//      HashSet<InternalDistributedMember> filter = new HashSet<InternalDistributedMember>();
-//      for (int i = 0; i < requests.size(); i++) {
-//        filter.add(requests.get(i).getSuspectMember());
-//      }
-//      recipients = currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5);
-//    } else {
+    if (currentView.size() > 4) {
+      HashSet<InternalDistributedMember> filter = new HashSet<InternalDistributedMember>();
+      for (int i = 0; i < requests.size(); i++) {
+        filter.add(requests.get(i).getSuspectMember());
+      }
+      recipients = currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5);
+    } else {
       recipients = currentView.getMembers();
-//    }
+    }
 
     SuspectMembersMessage smm = new SuspectMembersMessage(recipients, requests);
     Set<InternalDistributedMember> failedRecipients;


[15/18] incubator-geode git commit: GEODE-1201: Adding compileRuntimeLibs to geode-assembly for tests Amending bind address configuration for http-service-bind-address

Posted by kl...@apache.org.
GEODE-1201: Adding compileRuntimeLibs to geode-assembly for tests
Amending bind address configuration for http-service-bind-address


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d1e48259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d1e48259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d1e48259

Branch: refs/heads/feature/GEODE-1162
Commit: d1e48259894e60c54c588adba23a458f028e7145
Parents: a885ac1
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Mon Apr 11 07:57:21 2016 +1000
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Thu Apr 14 10:07:06 2016 +1000

----------------------------------------------------------------------
 geode-assembly/build.gradle                     |  6 +++
 .../gemfire/management/internal/RestAgent.java  | 51 ++++++++++++++------
 .../web/swagger/config/RestApiPathProvider.java | 27 ++++++++++-
 3 files changed, 68 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1e48259/geode-assembly/build.gradle
----------------------------------------------------------------------
diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index b7d05e2..8f14fc1 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -81,6 +81,7 @@ sourceSets {
 
 test {
   // test from the actual classpath not the gradle classpath
+  dependsOn copyRuntimeLibs
   dependsOn installDist
   // @TODO: this doesn't seem to be working need to get basename first.
   classpath += files "$buildDir/install/apache-geode/lib/geode-dependencies.jar"
@@ -112,6 +113,11 @@ task defaultCacheConfig(type: JavaExec, dependsOn: classes) {
   }
 }
 
+task copyRuntimeLibs(type: Copy) {
+  into "lib"
+  from configurations.testRuntime - configurations.runtime
+}
+
 // This closure sets the gemfire classpath.  If we add another jar to the classpath it must
 // be included in the filter logic below.
 def cp = {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1e48259/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
index a91df05..526bd2d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
@@ -17,26 +17,25 @@
 
 package com.gemstone.gemfire.management.internal;
 
-import org.apache.logging.log4j.Logger;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.GemFireVersion;
+import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.management.ManagementService;
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+
+import java.net.UnknownHostException;
 
 /**
  * Agent implementation that controls the HTTP server end points used for REST
  * clients to connect gemfire data node.
- * 
+ * <p>
  * The RestAgent is used to start http service in embedded mode on any non
  * manager data node with developer REST APIs service enabled.
  *
@@ -109,8 +108,9 @@ public class RestAgent {
   public void startHttpService() {
     // TODO: add a check that will make sure that we start HTTP service on
     // non-manager data node
+    String httpServiceBindAddress = getBindAddressForHttpService();
     logger.info("Attempting to start HTTP service on port ({}) at bind-address ({})...",
-        this.config.getHttpServicePort(), this.config.getHttpServiceBindAddress());
+        this.config.getHttpServicePort(), httpServiceBindAddress);
 
     // Find the developer REST WAR file
     final String gemfireAPIWar = agentUtil.findWarLocation("geode-web-api");
@@ -124,10 +124,9 @@ public class RestAgent {
         logger.warn("Detected presence of catalina system properties. HTTP service will not be started. To enable the GemFire Developer REST API, please deploy the /geode-web-api WAR file in your application server."); 
       } else if (agentUtil.isWebApplicationAvailable(gemfireAPIWar)) {
 
-        final String bindAddress = this.config.getHttpServiceBindAddress();
         final int port = this.config.getHttpServicePort();
 
-        this.httpServer = JettyHelper.initJetty(bindAddress, port,
+        this.httpServer = JettyHelper.initJetty(httpServiceBindAddress, port,
             this.config.getHttpServiceSSLEnabled(),
             this.config.getHttpServiceSSLRequireAuthentication(),
             this.config.getHttpServiceSSLProtocols(), this.config.getHttpServiceSSLCiphers(),
@@ -136,8 +135,8 @@ public class RestAgent {
         this.httpServer = JettyHelper.addWebApplication(httpServer, "/gemfire-api", gemfireAPIWar);
 
         if (logger.isDebugEnabled()) {
-          logger.debug("Starting HTTP embedded server on port ({}) at bind-address ({})...",
-              ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), bindAddress);
+          logger.info("Starting HTTP embedded server on port ({}) at bind-address ({})...",
+              ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), httpServiceBindAddress);
         }
 
         this.httpServer = JettyHelper.startJetty(this.httpServer);
@@ -151,6 +150,28 @@ public class RestAgent {
     }
   }
 
+  private String getBindAddressForHttpService() {
+    java.lang.String bindAddress = this.config.getHttpServiceBindAddress();
+    if (StringUtils.isBlank(bindAddress)) {
+      if (StringUtils.isBlank(this.config.getServerBindAddress())) {
+        if (StringUtils.isBlank(this.config.getBindAddress())) {
+          try {
+            bindAddress = SocketCreator.getLocalHost().getHostAddress();
+            logger.info("RestAgent.getBindAddressForHttpService.localhost: " + SocketCreator.getLocalHost().getHostAddress());
+          } catch (UnknownHostException e) {
+            logger.error("LocalHost could not be found.", e);
+            return bindAddress;
+          }
+        } else {
+          bindAddress = this.config.getBindAddress();
+        }
+      } else {
+        bindAddress = this.config.getServerBindAddress();
+      }
+    }
+    return bindAddress;
+  }
+
   private void stopHttpService() {
     if (this.httpServer != null) {
       logger.info("Stopping the HTTP service...");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1e48259/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/swagger/config/RestApiPathProvider.java
----------------------------------------------------------------------
diff --git a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/swagger/config/RestApiPathProvider.java b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/swagger/config/RestApiPathProvider.java
index a8921d7..47316ed 100644
--- a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/swagger/config/RestApiPathProvider.java
+++ b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/swagger/config/RestApiPathProvider.java
@@ -18,8 +18,10 @@ package com.gemstone.gemfire.rest.internal.web.swagger.config;
 
 import javax.servlet.ServletContext;
 
+import com.gemstone.gemfire.admin.internal.InetAddressUtil;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.mangofactory.swagger.core.SwaggerPathProvider;
 
@@ -27,6 +29,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.Assert;
 import org.springframework.web.util.UriComponentsBuilder;
 
+import java.net.UnknownHostException;
+
 @SuppressWarnings("unused")
 public class RestApiPathProvider implements SwaggerPathProvider {
 
@@ -44,7 +48,28 @@ public class RestApiPathProvider implements SwaggerPathProvider {
     DistributionConfig config = InternalDistributedSystem.getAnyInstance().getConfig();
     String scheme = config.getHttpServiceSSLEnabled() ? "https" : "http";
 
-    this.docsLocation = scheme + "://" + config.getHttpServiceBindAddress() + ":" + config.getHttpServicePort();
+    this.docsLocation = scheme + "://" + getBindAddressForHttpService() + ":" + config.getHttpServicePort();
+  }
+
+  private String getBindAddressForHttpService() {
+    DistributionConfig config = InternalDistributedSystem.getAnyInstance().getConfig();
+    java.lang.String bindAddress = config.getHttpServiceBindAddress();
+    if (org.apache.commons.lang.StringUtils.isBlank(bindAddress)) {
+      if (org.apache.commons.lang.StringUtils.isBlank(config.getServerBindAddress())) {
+        if (org.apache.commons.lang.StringUtils.isBlank(config.getBindAddress())) {
+          try {
+          bindAddress = SocketCreator.getLocalHost().getHostAddress();
+          } catch (UnknownHostException e) {
+            e.printStackTrace();
+          }
+        } else {
+          bindAddress = config.getBindAddress();
+        }
+      } else {
+        bindAddress = config.getServerBindAddress();
+      }
+    }
+    return bindAddress;
   }
 
   @Override


[07/18] incubator-geode git commit: GEODE-1199: fix off-heap leak in netWrite

Posted by kl...@apache.org.
GEODE-1199: fix off-heap leak in netWrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/80533ba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/80533ba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/80533ba8

Branch: refs/heads/feature/GEODE-1162
Commit: 80533ba8503e8fa04aed3291b2d4efbc29e61535
Parents: fbee35c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Apr 7 16:20:18 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Apr 13 10:16:20 2016 -0700

----------------------------------------------------------------------
 .../cache/SearchLoadAndWriteProcessor.java      | 28 ++++++--
 .../cache/SearchLoadAndWriteProcessorTest.java  | 68 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
index 8224fc2..d9729a7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
@@ -74,6 +74,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 
 
 /**
@@ -211,7 +213,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
     int action = paction;
     this.requestInProgress = true;
-    Scope scope = this.region.scope;
+    Scope scope = this.region.getScope();
     if (localWriter != null) {
       doLocalWrite(localWriter, event, action);
       this.requestInProgress = false;
@@ -220,14 +222,22 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
     if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) {
       return false;
     }
+    @Released
     CacheEvent listenerEvent = getEventForListener(event);
+    try {
     if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) {
       action = BEFORECREATE;
     }
-    boolean cacheWrote = netWrite(getEventForListener(event), action, netWriteRecipients);
+    boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients);
     this.requestInProgress = false;
     return cacheWrote;
-
+    } finally {
+      if (event != listenerEvent) {
+        if (listenerEvent instanceof EntryEventImpl) {
+          ((EntryEventImpl) listenerEvent).release();
+        }
+      }
+    }
   }
 
   public void memberJoined(InternalDistributedMember id) {
@@ -810,16 +820,20 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   /**
    * Returns an event for listener notification.  The event's operation
-   * may be altered to conform to the ConcurrentMap implementation specification
+   * may be altered to conform to the ConcurrentMap implementation specification.
+   * If the returned value is not == to the event parameter then the caller
+   * is responsible for releasing it.
    * @param event the original event
    * @return the original event or a new event having a change in operation
    */
+  @Retained
   private CacheEvent getEventForListener(CacheEvent event) {
     Operation op = event.getOperation();
     if (!op.isEntry()) {
       return event;
     } else {
       EntryEventImpl r = (EntryEventImpl)event;
+      @Retained
       EntryEventImpl result = r;
       if (r.isSingleHop()) {
         // fix for bug #46130 - origin remote incorrect for one-hop operation in receiver
@@ -856,6 +870,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         return false;
       }
     }
+    @Released
     CacheEvent event = getEventForListener(pevent);
     
     int action = paction;
@@ -898,8 +913,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
   /** Return true if cache writer was invoked */
   private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet)
   throws CacheWriterException, TimeoutException {
-
-    // assert !writeCandidateSet.isEmpty();
+    if (writeCandidateSet == null || writeCandidateSet.isEmpty()) {
+      return false;
+    }
     ArrayList list = new ArrayList(writeCandidateSet);
     Collections.shuffle(list);
     InternalDistributedMember[] writeCandidates = (InternalDistributedMember[])list.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
new file mode 100644
index 0000000..57aca37
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SearchLoadAndWriteProcessorTest {
+
+  /**
+   * This test verifies the fix for GEODE-1199.
+   * It verifies that when doNetWrite is called with an event
+   * that has a StoredObject value that it will have "release"
+   * called on it.
+   */
+  @Test
+  public void verifyThatOffHeapReleaseIsCalledAfterNetWrite() {
+    // setup
+    SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    Object key = "key";
+    StoredObject value = mock(StoredObject.class);
+    when(value.hasRefCount()).thenReturn(true);
+    when(value.retain()).thenReturn(true);
+    Object cbArg = null;
+    KeyInfo keyInfo = new KeyInfo(key, value, cbArg);
+    when(lr.getKeyInfo(any(), any(), any())).thenReturn(keyInfo);
+    processor.region = lr;
+    EntryEventImpl event = EntryEventImpl.create(lr, Operation.REPLACE, key, value, cbArg, false, null);
+    
+    try {
+      // the test
+      processor.doNetWrite(event, null, null, 0);
+      
+      // verification
+      verify(value, times(2)).retain();
+      verify(value, times(1)).release();
+      
+    } finally {
+      processor.release();
+    }
+  }
+
+}