You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/20 22:01:57 UTC

[03/50] [abbrv] incubator-geode git commit: GEODE-77: exiting from main() doesn't cause Shutdown Hook to run

GEODE-77: exiting from main() doesn't cause Shutdown Hook to run

This introduces a subclass of the JGroups UDP protocol in order to
force it to create daemon threads.  It also inhibits setting time-to-live
on the UDP socket unless multicast is enabled.  This was causing
InvocationTargetExceptions to be logged at ERROR level on Windows 7 machines.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0f7daf25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0f7daf25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0f7daf25

Branch: refs/heads/develop
Commit: 0f7daf2563145a1084f66a6195d3e354ebf063df
Parents: 5db8055
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Oct 16 15:44:08 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Oct 16 15:44:08 2015 -0700

----------------------------------------------------------------------
 .../internal/DistributionMessage.java           |   2 +-
 .../membership/gms/fd/GMSHealthMonitor.java     |   2 +
 .../gms/messenger/AddressManager.java           |   6 +-
 .../gms/messenger/JGroupsMessenger.java         |   7 +-
 .../membership/gms/messenger/Transport.java     |  59 ++++++++
 .../cache/DistributedCacheOperation.java        |   4 +-
 .../gemfire/internal/cache/LocalRegion.java     |   5 +
 .../distributed/internal/jgroups-config.xml     |   2 +-
 .../distributed/internal/jgroups-mcast.xml      |   2 +-
 .../AutoConnectionSourceWithUDPDUnitTest.java   | 140 -------------------
 .../messenger/JGroupsMessengerJUnitTest.java    |   2 +-
 11 files changed, 80 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
index 8cefd9c..ac6d66e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
@@ -485,7 +485,7 @@ public abstract class DistributionMessage
    */
   public static boolean isPreciousThread() {
     String thrname = Thread.currentThread().getName();
-    return thrname.startsWith("Incoming-") || thrname.startsWith("OOB-");
+    return thrname.startsWith("GEODE UDP");
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index ae79fc5..9d87014 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -749,6 +749,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
               logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
               boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+              logger.info("Final check {}", pinged? "succeeded" : "failed");
+              
               if (!pinged && !isStopping) {
                 ts = memberVsLastMsgTS.get(mbr);
                 if (ts == null || ts.getTimeStamp() <= startTime) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java
index a1305d5..8ca0c8e 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java
@@ -31,7 +31,7 @@ public class AddressManager extends Protocol {
 
   private static final Logger logger = Services.getLogger();
   
-  private UDP udp;
+  private TP transport;
   private Method setPingData;
   boolean warningLogged = false;
 
@@ -82,7 +82,7 @@ public class AddressManager extends Protocol {
     if (setPingData != null) {
       Exception problem = null;
       try {
-        setPingData.invoke(udp, new Object[]{pd});
+        setPingData.invoke(transport, new Object[]{pd});
       } catch (InvocationTargetException e) {
         problem = e;
       } catch (IllegalAccessException e) {
@@ -99,7 +99,7 @@ public class AddressManager extends Protocol {
    * find and initialize the method used to update UDP's address cache
    */
   private void findPingDataMethod() {
-    udp = (UDP)getProtocolStack().findProtocol("UDP");
+    transport = (TP)getProtocolStack().getTransport();
     try {
       setPingData = TP.class.getDeclaredMethod("setPingData", new Class<?>[]{PingData.class});
       setPingData.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 556f974..7f6a40e 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -100,8 +100,9 @@ public class JGroupsMessenger implements Messenger {
    */
   private static final String DEFAULT_JGROUPS_MCAST_CONFIG = "com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml";
 
-  /** JG magic numbers for types we need to serialize */
+  /** JG magic numbers for types added to the JG ClassConfigurator */
   public static final short JGROUPS_TYPE_JGADDRESS = 2000;
+  public static final short JGROUPS_PROTOCOL_TRANSPORT = 1000;
 
   public static boolean THROW_EXCEPTION_ON_START_HOOK;
 
@@ -130,7 +131,9 @@ public class JGroupsMessenger implements Messenger {
   
   static {
     // register classes that we've added to jgroups that are put on the wire
-    ClassConfigurator.add(JGroupsMessenger.JGROUPS_TYPE_JGADDRESS, JGAddress.class);
+    // or need a header ID
+    ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class);
+    ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
new file mode 100755
index 0000000..4027437
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
@@ -0,0 +1,59 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.jgroups.protocols.UDP;
+import org.jgroups.util.DefaultThreadFactory;
+import org.jgroups.util.LazyThreadFactory;
+
+public class Transport extends UDP {
+  
+  /*
+   * (non-Javadoc)
+   * Copied from JGroups 3.6.6 UDP and modified to suppress
+   * stack traces when unable to set the ttl due to the TCP implementation
+   * not supporting the setting, and to only set ttl when multicast is
+   * going to be used.
+   * 
+   * @see org.jgroups.protocols.UDP#setTimeToLive(int)
+   */
+  @Override
+  protected void setTimeToLive(int ttl) {
+    if (ip_mcast) {
+      if(getImpl != null && setTimeToLive != null) {
+        try {
+            Object impl=getImpl.invoke(sock);
+            setTimeToLive.invoke(impl, ttl);
+        }
+        catch(InvocationTargetException e) {
+          log.info("Unable to set ip_ttl - TCP/IP implementation does not support this setting");
+        }
+        catch(Exception e) {
+            log.error("failed setting ip_ttl", e);
+        }
+      } else {
+        log.warn("ip_ttl %d could not be set in the datagram socket; ttl will default to 1 (getImpl=%s, " +
+                   "setTimeToLive=%s)", ttl, getImpl, setTimeToLive);
+      }
+    }
+  }
+
+    
+  /*
+   * (non-Javadoc)
+   * JGroups does not currently (3.6.6) allow you to specify that
+   * threads should be daemon, so we override the init() method here
+   * and create the factories before initializing UDP
+   * @see org.jgroups.protocols.UDP#init()
+   */
+  @Override
+  public void init() throws Exception {
+    global_thread_factory=new DefaultThreadFactory("Geode ", true);
+    timer_thread_factory=new LazyThreadFactory("Geode UDP Timer", true, true);
+    default_thread_factory=new DefaultThreadFactory("Geode UDP Incoming", true, true);
+    oob_thread_factory=new DefaultThreadFactory("Geode UDP OOB", true, true);
+    internal_thread_factory=new DefaultThreadFactory("Geode UDP INT", true, true);
+    super.init();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index bff0ff7..e528ca2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -1100,8 +1100,8 @@ public abstract class DistributedCacheOperation {
       boolean sendReply = true;
       InternalCacheEvent event = null;
 
-      if (logger.isDebugEnabled()) {
-        logger.debug("DistributedCacheOperation.basicProcess: {}", this);
+      if (logger.isTraceEnabled()) {
+        logger.trace("DistributedCacheOperation.basicProcess: {}", this);
       }
       try {
         // LocalRegion lclRgn = getRegionFromPath(dm.getSystem(),

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index bea7bd1..2dfbcee 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -9204,6 +9204,11 @@ public class LocalRegion extends AbstractRegion
   {
     
     CacheListener[] listeners = region.fetchCacheListenersField();
+    if (event.getOperation().isCreate()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("invoking listeners: " + Arrays.toString(listeners));
+      }
+    }
     if (listeners == null || listeners.length == 0) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml
index 73e66b4..8393d31 100755
--- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml
+++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-config.xml
@@ -1,7 +1,7 @@
 <config xmlns="urn:org:jgroups"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
-<UDP
+<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.Transport
   BIND_ADDR_SETTING
   bind_port="MEMBERSHIP_PORT_RANGE_START"
   port_range="MEMBERSHIP_PORT_RANGE_END"

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml
index 4c6f25c..c16fad3 100755
--- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml
+++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/jgroups-mcast.xml
@@ -1,7 +1,7 @@
 <config xmlns="urn:org:jgroups"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
-<UDP
+<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.Transport
   BIND_ADDR_SETTING
   bind_port="MEMBERSHIP_PORT_RANGE_START"
   port_range="MEMBERSHIP_PORT_RANGE_END"

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceWithUDPDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceWithUDPDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceWithUDPDUnitTest.java
deleted file mode 100644
index 9f1d67a..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceWithUDPDUnitTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.client.internal;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import junit.framework.Assert;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.Locator;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-
-import dunit.Host;
-import dunit.SerializableCallable;
-import dunit.SerializableRunnable;
-import dunit.VM;
-
-/**
- * Same tests as the auto connection source test, but the
- * system is using multicast for membership discovery, and
- * the locator is only used for peer discovery.
- * @author dsmith
- *
- */
-public class AutoConnectionSourceWithUDPDUnitTest extends
-    AutoConnectionSourceDUnitTest {
-
-  protected int mCastPort;
-
-  public AutoConnectionSourceWithUDPDUnitTest(String name) {
-    super(name);
-  }
-  
-  public void testStartLocatorLater() throws InterruptedException {
-    final Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    
-    startBridgeServerInVM(vm1, null, null);
-    
-    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    startLocatorInVM(vm0, locatorPort, "");
-    
-    startBridgeClientInVM(vm2, null, getServerHostName(vm0.getHost()), locatorPort);
-    putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
-    Assert.assertEquals("value", getInVM(vm1, "key"));
-  }
-  
-  public void setUp() throws Exception {
-    super.setUp();
-    mCastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS);
-    addExpectedException("java.net.SocketException");
-  }
-
-  protected int startBridgeServerInVM(VM vm, final String[] groups, String locators,
-      final String[] regions) {
-    SerializableCallable connect =
-      new SerializableCallable("Start bridge server") {
-          public Object call() throws IOException  {
-            Properties props = new Properties();
-            props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(mCastPort));
-            props.setProperty(DistributionConfig.MCAST_ADDRESS_NAME, DistributionConfig.DEFAULT_MCAST_ADDRESS.getHostAddress());
-            props.setProperty(DistributionConfig.LOCATORS_NAME, "");
-            DistributedSystem ds = getSystem(props);
-            Cache cache = CacheFactory.create(ds);
-            AttributesFactory factory = new AttributesFactory();
-            factory.setScope(Scope.DISTRIBUTED_ACK);
-            factory.setEnableBridgeConflation(true);
-            factory.setDataPolicy(DataPolicy.REPLICATE);
-            RegionAttributes attrs = factory.create();
-            for(int i = 0; i < regions.length; i++) {
-              cache.createRegion(regions[i], attrs);
-            }
-            CacheServer server = cache.addCacheServer();
-            final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
-            server.setPort(serverPort);
-            server.setGroups(groups);
-            server.start();
-            
-            remoteObjects.put(CACHE_KEY, cache);
-            
-            return new Integer(serverPort);
-          }
-        };
-    Integer port = (Integer) vm.invoke(connect);
-    return port.intValue();
-  }
-
-  public void startLocatorInVM(final VM vm, final int locatorPort, final String otherLocators) {
-    vm.invoke(new SerializableRunnable("Create Locator") {
-
-      final String testName= getUniqueName();
-      public void run() {
-        disconnectFromDS();
-        Properties props = new Properties();
-        props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(mCastPort));
-        props.setProperty(DistributionConfig.MCAST_ADDRESS_NAME, DistributionConfig.DEFAULT_MCAST_ADDRESS.getHostAddress());
-        props.setProperty(DistributionConfig.LOCATORS_NAME, "");
-        props.setProperty(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
-        props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-        InetAddress bindAddr = null;
-        try {
-          bindAddr = InetAddress.getByName(getServerHostName(vm.getHost()));
-        } catch (UnknownHostException uhe) {
-          fail("While resolving bind address ", uhe);
-        }
-        try {
-          File logFile = new File(testName + "-locator" + locatorPort
-              + ".log");
-          Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props, false, true, null);
-          remoteObjects.put(LOCATOR_KEY, locator);
-        } catch (IOException ex) {
-          fail("While starting locator on port " + locatorPort, ex);
-        }
-      }
-    });
-  }
-  
-  
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0f7daf25/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 985cfb5..e2c6700 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -95,7 +95,7 @@ public class JGroupsMessengerJUnitTest {
     messenger.init(services);
     
     String jgroupsConfig = messenger.getJGroupsStackConfig();
-    int startIdx = jgroupsConfig.indexOf("<UDP");
+    int startIdx = jgroupsConfig.indexOf("<com");
     int insertIdx = jgroupsConfig.indexOf('>', startIdx+4) + 1;
     jgroupsConfig = jgroupsConfig.substring(0, insertIdx) +
         "<"+InterceptUDP.class.getName()+"/>" +