You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/06/01 22:29:23 UTC

[1/3] incubator-geode git commit: GEODE-1372 added unit test and some more fixes.

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1372 d6fb78341 -> 667c4259f


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
index 3a06c1c..701cc07 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
@@ -109,6 +109,11 @@ public class LocatorDUnitTest extends DistributedTestCase {
       system = null;
     }
   }
+  
+  //for child classes 
+  protected void addDSProps(Properties p) {
+    
+  }
 
   ////////  Test Methods
 
@@ -120,7 +125,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * it hung with the restarted locator trying to become elder again because
    * it put its address at the beginning of the new view it sent out.
    */
-  public void testCollocatedLocatorWithSecurity() throws Exception {
+  public void ntestCollocatedLocatorWithSecurity() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm1 = host.getVM(1);
@@ -139,6 +144,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("security-peer-authenticator", "com.gemstone.gemfire.distributed.MyAuthenticator.create");
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
     properties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+    addDSProps(properties);
     system = (InternalDistributedSystem) DistributedSystem.connect(properties);
     InternalDistributedMember mbr = system.getDistributedMember();
     assertEquals("expected the VM to have NORMAL vmKind",
@@ -249,7 +255,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    *
    * @throws Exception
    */
-  public void testStartTwoLocators() throws Exception {
+  public void ntestStartTwoLocators() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM loc1 = host.getVM(1);
@@ -273,7 +279,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("member-timeout", "2000");
     properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(properties);
     SerializableCallable startLocator1 = new SerializableCallable("start locator1") {
       @Override
       public Object call() throws Exception {
@@ -362,7 +368,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
   /**
    * test lead member selection
    */
-  public void testLeadMemberSelection() throws Exception {
+  public void ntestLeadMemberSelection() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm1 = host.getVM(1);
@@ -377,7 +383,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("locators", locators);
     properties.put("enable-network-partition-detection", "true");
     properties.put("disable-auto-reconnect", "true");
-
+    addDSProps(properties);
     File logFile = new File("");
     if (logFile.exists()) {
       logFile.delete();
@@ -471,7 +477,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * We then kill the lead member and demonstrate that the original locator
    * (which is now the sole remaining member) shuts itself down.
    */
-  public void testLeadAndCoordFailure() throws Exception {
+  public void ntestLeadAndCoordFailure() throws Exception {
     IgnoredException.addIgnoredException("Possible loss of quorum due");
     disconnectAllFromDS();
     Host host = Host.getHost(0);
@@ -497,7 +503,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
     //    properties.put("log-level", "fine");
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(properties);
     try {
       final String uname = getUniqueName();
       File logFile = new File("");
@@ -604,7 +610,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * We then shut down the group coordinator and observe the second locator
    * pick up the job and the remaining member continues to operate normally.
    */
-  public void testLeadFailureAndCoordShutdown() throws Exception {
+  public void ntestLeadFailureAndCoordShutdown() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm1 = host.getVM(1);
@@ -628,7 +634,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("disable-auto-reconnect", "true");
     properties.put("member-timeout", "2000");
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(properties);
     SerializableRunnable stopLocator = getStopLocatorRunnable();
 
     try {
@@ -750,7 +756,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    */
   // disabled on trunk - should be reenabled on cedar_dev_Oct12
   // this test leaves a CloserThread around forever that logs "pausing" messages every 500 ms
-  public void testForceDisconnectAndPeerShutdownCause() throws Exception {
+  public void ntestForceDisconnectAndPeerShutdownCause() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm1 = host.getVM(1);
@@ -774,7 +780,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("member-timeout", "2000");
     properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(properties);
     SerializableRunnable stopLocator = getStopLocatorRunnable();
 
     try {
@@ -887,7 +893,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * We kill the coordinator and shut down the lead member and observe the second locator
    * pick up the job and the remaining member continue to operate normally.
    */
-  public void testLeadShutdownAndCoordFailure() throws Exception {
+  public void ntestLeadShutdownAndCoordFailure() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm1 = host.getVM(1);
@@ -910,7 +916,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     properties.put("disable-auto-reconnect", "true");
     properties.put("member-timeout", "2000");
     properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(properties);
     SerializableRunnable disconnect =
         new SerializableRunnable("Disconnect from " + locators) {
           public void run() {
@@ -1012,7 +1018,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * Tests that attempting to connect to a distributed system in which
    * no locator is defined throws an exception.
    */
-  public void testNoLocator() {
+  public void ntestNoLocator() {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     int port =
@@ -1022,7 +1028,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     Properties props = new Properties();
     props.setProperty("mcast-port", "0");
     props.setProperty("locators", locators);
-
+    addDSProps(props);
     final String expected = "java.net.ConnectException";
     final String addExpected =
         "<ExpectedException action=add>" + expected + "</ExpectedException>";
@@ -1072,7 +1078,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * <p>The locator is then restarted and is shown to take over the
    * role of membership coordinator.
    */
-  public void testOneLocator() throws Exception {
+  public void ntestOneLocator() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
@@ -1093,7 +1099,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
           locProps.setProperty("mcast-port", "0");
           locProps.setProperty("member-timeout", "1000");
           locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+          addDSProps(locProps);  
           Locator.startLocatorAndDS(port, logFile, locProps);
         } catch (IOException ex) {
           com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port, ex);
@@ -1110,6 +1116,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
               props.setProperty("mcast-port", "0");
               props.setProperty("locators", locators);
               props.setProperty("member-timeout", "1000");
+              addDSProps(props);
               DistributedSystem.connect(props);
             }
           };
@@ -1120,7 +1127,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
       props.setProperty("mcast-port", "0");
       props.setProperty("locators", locators);
       props.setProperty("member-timeout", "1000");
-
+      addDSProps(props);
       system = (InternalDistributedSystem) DistributedSystem.connect(props);
 
       final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
@@ -1164,7 +1171,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * is correct.  It then restarts the locator to demonstrate that
    * it can connect to and function as the group coordinator
    */
-  public void testLocatorBecomesCoordinator() throws Exception {
+  public void ntestLocatorBecomesCoordinator() throws Exception {
     disconnectAllFromDS();
     final String expected = "java.net.ConnectException";
     final String addExpected =
@@ -1189,7 +1196,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
       props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
       props.setProperty(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
       props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+      addDSProps(props);
       SerializableRunnable connect =
           new SerializableRunnable("Connect to " + locators) {
             public void run() {
@@ -1301,7 +1308,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
   /**
    * Tests starting multiple locators in multiple VMs.
    */
-  public void testMultipleLocators() throws Exception {
+  public void ntestMultipleLocators() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
@@ -1323,7 +1330,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     dsProps.setProperty("locators", locators);
     dsProps.setProperty("mcast-port", "0");
     dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    addDSProps(dsProps);
     vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
       public void run() {
         File logFile = new File("");
@@ -1357,6 +1364,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
                 Properties props = new Properties();
                 props.setProperty("mcast-port", "0");
                 props.setProperty("locators", locators);
+                addDSProps(props);
                 DistributedSystem.connect(props);
               }
             };
@@ -1366,7 +1374,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
         Properties props = new Properties();
         props.setProperty("mcast-port", "0");
         props.setProperty("locators", locators);
-
+        addDSProps(props);
         system = (InternalDistributedSystem) DistributedSystem.connect(props);
 
         WaitCriterion ev = new WaitCriterion() {
@@ -1417,7 +1425,6 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * end up only have 1 master.
    * GEODE-870
    */
-  @Category(FlakyTest.class) // GEODE-1150: random ports, disk pollution, waitForCriterion, time sensitive, eats exceptions (fixed several)
   public void testMultipleLocatorsRestartingAtSameTime() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
@@ -1441,8 +1448,8 @@ public class LocatorDUnitTest extends DistributedTestCase {
     dsProps.setProperty(DistributionConfig.LOCATORS_NAME, locators);
     dsProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
     dsProps.setProperty(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
-    dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+    dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");    
+    addDSProps(dsProps);
     startLocatorSync(vm0, new Object[] { port1, dsProps });
     startLocatorSync(vm1, new Object[] { port2, dsProps });
     startLocatorSync(vm2, new Object[] { port3, dsProps });
@@ -1620,7 +1627,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
   /**
    * Tests starting multiple locators in multiple VMs.
    */
-  public void testMultipleMcastLocators() throws Exception {
+  public void ntestMultipleMcastLocators() throws Exception {
     disconnectAllFromDS();
     IgnoredException.addIgnoredException("Could not stop  Distribution Locator"); // shutdown timing issue in InternalLocator
     Host host = Host.getHost(0);
@@ -1653,7 +1660,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
           props.setProperty("mcast-ttl", "0");
           props.setProperty("enable-network-partition-detection", "true");
           props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+          addDSProps(props);
           Locator.startLocatorAndDS(port1, logFile, null, props);
         } catch (IOException ex) {
           com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port1, ex);
@@ -1671,6 +1678,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
           props.setProperty("mcast-ttl", "0");
           props.setProperty("enable-network-partition-detection", "true");
           props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+          addDSProps(props);
           Locator.startLocatorAndDS(port2, logFile, null, props);
         } catch (IOException ex) {
           com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port2, ex);
@@ -1687,6 +1695,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
             props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
             props.setProperty("mcast-ttl", "0");
             props.setProperty("enable-network-partition-detection", "true");
+            addDSProps(props);
             DistributedSystem.connect(props);
           }
         };
@@ -1700,7 +1709,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
       props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
       props.setProperty("mcast-ttl", "0");
       props.setProperty("enable-network-partition-detection", "true");
-
+      addDSProps(props);
       system = (InternalDistributedSystem) DistributedSystem.connect(props);
       WaitCriterion ev = new WaitCriterion() {
         public boolean done() {
@@ -1735,7 +1744,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    * Tests that a VM can connect to a locator that is hosted in its
    * own VM.
    */
-  public void testConnectToOwnLocator() throws Exception {
+  public void ntestConnectToOwnLocator() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
 
@@ -1752,6 +1761,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
       props.setProperty("mcast-port", "0");
       props.setProperty("locators", locators);
       props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+      addDSProps(props);
       system = (InternalDistributedSystem) DistributedSystem.connect(props);
       system.disconnect();
     } finally {
@@ -1762,7 +1772,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
   /**
    * Tests that a single VM can NOT host multiple locators
    */
-  public void testHostingMultipleLocators() throws Exception {
+  public void ntestHostingMultipleLocators() throws Exception {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     //VM vm = host.getVM(0);
@@ -1797,6 +1807,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
               props.setProperty("mcast-port", "0");
               props.setProperty("locators", locators);
               props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
+              addDSProps(props);
               DistributedSystem.connect(props);
             }
           };
@@ -1819,7 +1830,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
    *
    * @since 4.1
    */
-  public void testRestartLocator() throws Exception {
+  public void ntestRestartLocator() throws Exception {
     disconnectAllFromDS();
     port1 =
         AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -1831,6 +1842,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
     p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "[" + port1 + "]");
     p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     p.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    addDSProps(p);
     if (stateFile.exists()) {
       stateFile.delete();
     }
@@ -1916,6 +1928,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
           Properties locProps = new Properties();
           locProps.put("mcast-port", "0");
           locProps.put("log-level", LogWriterUtils.getDUnitLogLevel());
+          addDSProps(locProps);
           Locator.startLocatorAndDS(port, logFile, locProps);
         } catch (IOException ex) {
           com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port, ex);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
new file mode 100755
index 0000000..37f14c3
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
@@ -0,0 +1,28 @@
+package com.gemstone.gemfire.distributed;
+
+import java.util.Properties;
+
+import org.junit.Test;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+
+public class LocatorUDPSecurityDUnitTest extends LocatorDUnitTest{
+
+  public LocatorUDPSecurityDUnitTest(String name) {
+    super(name);
+  }
+  
+  @Test
+  public void testLoop() throws Exception {
+    for(int i=0; i < 2; i++) {
+      testMultipleLocatorsRestartingAtSameTime();
+      tearDown();
+      setUp();
+    }
+  }
+  
+  @Override
+  protected void addDSProps(Properties p) {
+    p.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 1e1724d..0d3b9fc 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -190,7 +190,7 @@ public class GMSJoinLeaveJUnitTest {
     
     // simulate a response being received
     InternalDistributedMember sender = mockMembers[2];
-    FindCoordinatorResponse resp = new FindCoordinatorResponse(coordinator, sender);
+    FindCoordinatorResponse resp = new FindCoordinatorResponse(coordinator, sender, null, 0);
     gmsJoinLeave.processMessage(resp);
     // tell GMSJoinLeave that a unit test is running so it won't clear the
     // responses collection
@@ -207,7 +207,7 @@ public class GMSJoinLeaveJUnitTest {
   public void testProcessJoinMessageRejectOldMemberVersion() throws IOException {
     initMocks();
  
-    gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null, -1));
+    gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null, -1, 0));
     assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
     verify(messenger).send(any(JoinResponseMessage.class));
   }
@@ -230,7 +230,7 @@ public class GMSJoinLeaveJUnitTest {
     when(authenticator.authenticate(mockMembers[0], credentials)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
     when(services.getMessenger()).thenReturn(messenger);
          
-    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
+    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1, 0));
     assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
     verify(messenger).send(any(JoinResponseMessage.class));
   }
@@ -242,7 +242,7 @@ public class GMSJoinLeaveJUnitTest {
     when(authenticator.authenticate(mockMembers[0], null)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
     when(services.getMessenger()).thenReturn(messenger);
       
-    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null, -1));
+    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null, -1, 0));
     assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
     verify(messenger).send(any(JoinResponseMessage.class));
   }
@@ -257,9 +257,22 @@ public class GMSJoinLeaveJUnitTest {
       
     JoinResponseMessage[] joinResponse = gmsJoinLeave.getJoinResponseMessage();
     
-    JoinResponseMessage jrm = new JoinResponseMessage();
+    JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], new byte[9], 233);
     gmsJoinLeave.processMessage(jrm);
+    //this should NOT logs, this is just to inform member succesful joining
+    Assert.assertEquals(null, joinResponse[0]);
+    
+    jrm = new JoinResponseMessage("rejected...", 0);
+    gmsJoinLeave.processMessage(jrm);
+    //this should log..
     Assert.assertEquals(jrm, joinResponse[0]);
+    
+    gmsJoinLeave.setJoinResponseMessage(null);
+    
+    jrm = new JoinResponseMessage(mockMembers[0], new NetView(), 0);
+    gmsJoinLeave.processMessage(jrm);
+    //this should log..
+    Assert.assertEquals(jrm, joinResponse[0]);       
   }
   
   /**
@@ -475,10 +488,10 @@ public class GMSJoinLeaveJUnitTest {
     prepareAndInstallView(gmsJoinLeaveMemberId, createMemberList(gmsJoinLeaveMemberId,mockMembers[0]));
     gmsJoinLeave.getView().add(mockMembers[1]);
     GMSJoinLeaveTestHelper.becomeCoordinatorForTest(gmsJoinLeave);
-    JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
+    JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1, 0);
     msg.setSender(mockMembers[2]);
     gmsJoinLeave.processMessage(msg);
-    msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
+    msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1, 0);
     msg.setSender(mockMembers[2]);
     gmsJoinLeave.processMessage(msg);
     
@@ -872,7 +885,7 @@ public class GMSJoinLeaveJUnitTest {
     initMocks(false);
     System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
     gmsJoinLeave.join();
-    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
+    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1, 0));
     int viewRequests = gmsJoinLeave.getViewRequests().size();
     
     assertTrue( "There should be 1 viewRequest but found " + viewRequests, viewRequests == 1);
@@ -1072,13 +1085,13 @@ public class GMSJoinLeaveJUnitTest {
       initMocks(false);
       HashSet<InternalDistributedMember> registrants = new HashSet<>();
       registrants.add(mockMembers[0]);
-      FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true);
+      FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true, null);
       NetView view = createView();
-      JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view);
+      JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view, 0);
       
       TcpClientWrapper tcpClientWrapper = mock(TcpClientWrapper.class);
       gmsJoinLeave.setTcpClientWrapper(tcpClientWrapper);
-      FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1);
+      FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1, null, 0);
       int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
       when(tcpClientWrapper.sendCoordinatorFindRequest(new InetSocketAddress("localhost", 12345), fcreq, connectTimeout)).thenReturn(fcr);
       callAsnyc(()->{gmsJoinLeave.installView(view);});
@@ -1099,14 +1112,14 @@ public class GMSJoinLeaveJUnitTest {
       initMocks(false);
       HashSet<InternalDistributedMember> registrants = new HashSet<>();
       registrants.add(mockMembers[0]);
-      FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true);
+      FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true, null);
       NetView view = createView();
-      JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view);
+      JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view, 0);
       gmsJoinLeave.setJoinResponseMessage(jrm);
       
       TcpClientWrapper tcpClientWrapper = mock(TcpClientWrapper.class);
       gmsJoinLeave.setTcpClientWrapper(tcpClientWrapper);
-      FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1);
+      FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1, null, 0);
       int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
       //passing wrong port here, so ot will fail
       when(tcpClientWrapper.sendCoordinatorFindRequest(new InetSocketAddress("localhost", 12346), fcreq, connectTimeout)).thenReturn(fcr);
@@ -1224,7 +1237,7 @@ public class GMSJoinLeaveJUnitTest {
   }
   
   private void processJoinMessage(InternalDistributedMember coordinator, InternalDistributedMember newMember, int port) {
-    JoinRequestMessage reqMsg = new JoinRequestMessage(coordinator, newMember, null, port);
+    JoinRequestMessage reqMsg = new JoinRequestMessage(coordinator, newMember, null, port, 0);
     gmsJoinLeave.processMessage(reqMsg);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
index 3a08faa..1d7b8d0 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
@@ -158,7 +158,7 @@ public class GMSEncryptJUnitTest {
     initMocks();
 
     GMSEncrypt gmsEncrypt1 = new GMSEncrypt(services, mockMembers[1]); // this will be the sender
-    gmsEncrypt1.addClusterKey();
+    gmsEncrypt1.initClusterSecretKey();
     // establish the public keys for the sender and receiver
     netView.setPublicKey(mockMembers[1], gmsEncrypt1.getPublicKeyBytes());
     
@@ -182,7 +182,7 @@ public class GMSEncryptJUnitTest {
     initMocks();
 
     GMSEncrypt gmsEncrypt1 = new GMSEncrypt(services, mockMembers[1]); // this will be the sender
-    gmsEncrypt1.addClusterKey();
+    gmsEncrypt1.initClusterSecretKey();
     GMSEncrypt gmsEncrypt2 = new GMSEncrypt(services, mockMembers[2]); // this will be the sender
     
     // establish the public keys for the sender and receiver
@@ -191,7 +191,7 @@ public class GMSEncryptJUnitTest {
     
     gmsEncrypt1.installView(netView, mockMembers[1]);
     
-    byte[] secretBytes = gmsEncrypt1.getSecretBytes();
+    byte[] secretBytes = gmsEncrypt1.getClusterSecretKey();
     gmsEncrypt2.addClusterKey(secretBytes);
     
     gmsEncrypt2.installView(netView, mockMembers[1]);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 60e790b..14a0df1 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -69,6 +70,9 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Healt
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
@@ -96,10 +100,14 @@ public class JGroupsMessengerJUnitTest {
   private InterceptUDP interceptor;
   private long statsId = 123;
 
+  private void initMocks(boolean enableMcast) throws Exception {
+    initMocks(enableMcast, new Properties());
+  }
+  
   /**
    * Create stub and mock objects
    */
-  private void initMocks(boolean enableMcast) throws Exception {
+  private void initMocks(boolean enableMcast, Properties addProp) throws Exception {
     if (messenger != null) {
       messenger.stop();
       messenger = null;
@@ -112,6 +120,7 @@ public class JGroupsMessengerJUnitTest {
     nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
     nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]");
     nonDefault.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "1");
+    nonDefault.putAll(addProp);
     DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
     RemoteTransportConfig tconfig = new RemoteTransportConfig(config,
         DistributionManager.NORMAL_DM_TYPE);
@@ -134,6 +143,7 @@ public class JGroupsMessengerJUnitTest {
     when(services.getHealthMonitor()).thenReturn(healthMonitor);
     when(services.getManager()).thenReturn(manager);
     when(services.getJoinLeave()).thenReturn(joinLeave);
+    
     DM dm = mock(DM.class);
     InternalDistributedSystem system = InternalDistributedSystem.newInstanceForTesting(dm, nonDefault);
     when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
@@ -141,6 +151,9 @@ public class JGroupsMessengerJUnitTest {
     messenger = new JGroupsMessenger();
     messenger.init(services);
     
+    //if I do this earlier then test this return messenger as null
+    when(services.getMessenger()).thenReturn(messenger);
+    
     String jgroupsConfig = messenger.getJGroupsStackConfig();
     int startIdx = jgroupsConfig.indexOf("<com");
     int insertIdx = jgroupsConfig.indexOf('>', startIdx+4) + 1;
@@ -429,15 +442,15 @@ public class JGroupsMessengerJUnitTest {
     when(joinLeave.getView()).thenReturn(v);
 
     InternalDistributedMember sender = createAddress(8888);
-    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
+    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
     
-    Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
+    Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
     interceptor.up(new Event(Event.MSG, jmsg));
     
     verify(mh, times(1)).processMessage(any(JoinRequestMessage.class));
     
     LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing");
-    jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
+    jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL);
     interceptor.up(new Event(Event.MSG, jmsg));
     
     verify(manager).processMessage(any(LeaveRequestMessage.class));
@@ -469,7 +482,7 @@ public class JGroupsMessengerJUnitTest {
     NetView v = new NetView(sender);
     when(joinLeave.getView()).thenReturn(v);
     messenger.installView(v);
-    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
+    JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
     if (mcastMsg) {
       msg.setMulticast(true);
     }
@@ -481,7 +494,7 @@ public class JGroupsMessengerJUnitTest {
         sentMessages == 1);
 
     // send a big message and expect fragmentation
-    msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1);
+    msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1, 0);
 
     // configure an incoming message handler for JoinRequestMessage
     final DistributionMessage[] messageReceived = new DistributionMessage[1];
@@ -697,7 +710,7 @@ public class JGroupsMessengerJUnitTest {
     NetView view = new NetView(mbr);
     
     // the digest should be set in an outgoing join response
-    JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view);
+    JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view, 0);
     messenger.filterOutgoingMessage(joinResponse);
     assertNotNull(joinResponse.getMessengerData());
     
@@ -709,7 +722,7 @@ public class JGroupsMessengerJUnitTest {
     assertNull(joinResponse.getMessengerData());
     
     // the digest shouldn't be set in an outgoing rejection message
-    joinResponse = new JoinResponseMessage("you can't join my distributed system.  nyah nyah nyah!");
+    joinResponse = new JoinResponseMessage("you can't join my distributed system.  nyah nyah nyah!", 0);
     messenger.filterOutgoingMessage(joinResponse);
     assertNull(joinResponse.getMessengerData());
     
@@ -805,7 +818,7 @@ public class JGroupsMessengerJUnitTest {
       dmsg.setRecipients(recipients);
   
       // a message is ignored during manager shutdown
-      msg = messenger.createJGMessage(dmsg, new JGAddress(other), null, Version.CURRENT_ORDINAL);
+      msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
       when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
       receiver.receive(msg);
       verify(manager, never()).processMessage(isA(DistributionMessage.class));
@@ -896,6 +909,174 @@ public class JGroupsMessengerJUnitTest {
     assertFalse(AvailablePort.isPortAvailable(services.getConfig().getDistributionConfig().getMcastPort(), AvailablePort.MULTICAST));
   }
   
+  private NetView createView(InternalDistributedMember otherMbr) {
+    InternalDistributedMember sender = messenger.getMemberID();
+    List<InternalDistributedMember> mbrs = new ArrayList<>();
+    mbrs.add(sender);
+    mbrs.add(otherMbr);
+    NetView v = new NetView(sender, 1, mbrs);
+    return v;
+  }
+  
+  @Test
+  public void testEncryptedFindCoordinatorRequest() throws Exception{
+    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+    
+    Properties p = new Properties();    
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+    initMocks(false, p);
+    
+    NetView v = createView(otherMbr);
+    
+    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+    
+    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+    messenger.initClusterKey();
+    
+    FindCoordinatorRequest gfmsg = new FindCoordinatorRequest(messenger.getMemberID(), new ArrayList<InternalDistributedMember>(2), 1, messenger.getPublickey(messenger.getMemberID()), 1);
+    Set<InternalDistributedMember> recipients = new HashSet<>();
+    recipients.add(otherMbr);
+    gfmsg.setRecipients(recipients);
+    
+    short version = Version.CURRENT_ORDINAL;
+    
+    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+    
+    messenger.writeEncryptedMessage(gfmsg, version, out);
+    
+    byte[] requestBytes = out.toByteArray();
+    
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+    
+    DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+    
+    assertEquals(gfmsg, distributionMessage);
+  }
+  
+  @Test
+  public void testEncryptedFindCoordinatorResponse() throws Exception{
+    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+    
+    Properties p = new Properties();    
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+    initMocks(false, p);
+    
+    NetView v = createView(otherMbr);
+    
+    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+    otherMbrEncrptor.setPublicKey(messenger.getPublickey(messenger.getMemberID()), messenger.getMemberID());
+    
+    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+    messenger.initClusterKey();
+    
+    FindCoordinatorResponse gfmsg = new FindCoordinatorResponse(messenger.getMemberID(), messenger.getMemberID(),  messenger.getClusterSecretKey(), 1);
+    Set<InternalDistributedMember> recipients = new HashSet<>();
+    recipients.add(otherMbr);
+    gfmsg.setRecipients(recipients);
+    
+    short version = Version.CURRENT_ORDINAL;
+    
+    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+    
+    messenger.writeEncryptedMessage(gfmsg, version, out);
+    
+    byte[] requestBytes = out.toByteArray();
+    
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+    
+    messenger.addRequestId(1, messenger.getMemberID());
+    
+    DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+    
+    assertEquals(gfmsg, distributionMessage);
+  }
+  
+  @Test
+  public void testEncryptedJoinRequest() throws Exception{
+    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+    
+    Properties p = new Properties();    
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+    initMocks(false, p);
+    
+    NetView v = createView(otherMbr);
+    
+    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+    
+    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+    messenger.initClusterKey();
+    
+    JoinRequestMessage gfmsg = new JoinRequestMessage(otherMbr, messenger.getMemberID(), null, 9789, 1);
+    
+    short version = Version.CURRENT_ORDINAL;
+    
+    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+    
+    messenger.writeEncryptedMessage(gfmsg, version, out);
+    
+    byte[] requestBytes = out.toByteArray();
+    
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+    
+    DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+    
+    assertEquals(gfmsg, distributionMessage);
+  }
+  
+  @Test
+  public void testEncryptedJoinResponse() throws Exception{
+    InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+    
+    Properties p = new Properties();    
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+    initMocks(false, p);
+    
+    NetView v = createView(otherMbr);
+    
+    GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+    otherMbrEncrptor.setPublicKey(messenger.getPublickey(messenger.getMemberID()), messenger.getMemberID());
+    
+    messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+    messenger.initClusterKey();
+    
+    JoinResponseMessage gfmsg = new JoinResponseMessage(otherMbr, messenger.getClusterSecretKey(), 1);
+    
+    short version = Version.CURRENT_ORDINAL;
+    
+    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+    
+    messenger.writeEncryptedMessage(gfmsg, version, out);
+    
+    byte[] requestBytes = out.toByteArray();
+    
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+    
+    messenger.addRequestId(1, messenger.getMemberID());
+    
+    DistributionMessage gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+    
+    assertEquals(gfmsg, gfMessageAtOtherMbr);
+    
+    //lets send view as well..
+    
+    InstallViewMessage installViewMessage = new InstallViewMessage(v, null, true);
+    
+    out = new HeapDataOutputStream(Version.CURRENT);
+    
+    messenger.writeEncryptedMessage(installViewMessage, version, out);
+    
+    requestBytes = out.toByteArray();
+    
+    otherMbrEncrptor.addClusterKey(((JoinResponseMessage)gfMessageAtOtherMbr).getSecretPk());
+    
+    dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+    
+    gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+    
+    assertEquals(installViewMessage, gfMessageAtOtherMbr);
+    
+  }
+  
   /**
    * creates an InternalDistributedMember address that can be used
    * with the doctored JGroups channel.  This includes a logical



[2/3] incubator-geode git commit: GEODE-1372 added unit test and some more fixes.

Posted by hi...@apache.org.
GEODE-1372 added unit test and some more fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b6a73441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b6a73441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b6a73441

Branch: refs/heads/feature/GEODE-1372
Commit: b6a734415d1bec96c1f84a51b1fca16165bab0ae
Parents: 4d5f947
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Wed Jun 1 15:27:24 2016 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Wed Jun 1 15:27:24 2016 -0700

----------------------------------------------------------------------
 .../internal/membership/NetView.java            |   4 +-
 .../membership/gms/interfaces/Messenger.java    |  12 +
 .../gms/locator/FindCoordinatorRequest.java     |  22 +-
 .../gms/locator/FindCoordinatorResponse.java    |  70 ++-
 .../membership/gms/locator/GMSLocator.java      |  12 +-
 .../membership/gms/membership/GMSJoinLeave.java | 151 +++++--
 .../gms/messages/InstallViewMessage.java        |  25 ++
 .../gms/messages/JoinRequestMessage.java        |  49 ++-
 .../gms/messages/JoinResponseMessage.java       |  63 ++-
 .../membership/gms/messenger/GMSEncrypt.java    |  82 ++--
 .../gms/messenger/JGroupsMessenger.java         | 431 +++++++++++++------
 .../internal/InternalDataSerializer.java        |  18 +
 .../DistributedMulticastRegionDUnitTest.java    |   2 +
 .../gemfire/distributed/LocatorDUnitTest.java   |  77 ++--
 .../LocatorUDPSecurityDUnitTest.java            |  28 ++
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  43 +-
 .../gms/messenger/GMSEncryptJUnitTest.java      |   6 +-
 .../messenger/JGroupsMessengerJUnitTest.java    | 199 ++++++++-
 18 files changed, 1010 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index 365a193..92fbcac 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -586,7 +586,7 @@ public class NetView implements DataSerializableFixedID {
     InternalDataSerializer.writeSet(crashedMembers, out);
     DataSerializer.writeIntArray(failureDetectionPorts, out);
     // TODO expensive serialization
-    DataSerializer.writeObject(publicKeys, out);
+    DataSerializer.writeHashMap(publicKeys, out);
   }
 
   @Override
@@ -598,7 +598,7 @@ public class NetView implements DataSerializableFixedID {
     shutdownMembers = InternalDataSerializer.readHashSet(in);
     crashedMembers = InternalDataSerializer.readHashSet(in);
     failureDetectionPorts = DataSerializer.readIntArray(in);
-    publicKeys = DataSerializer.readObject(in);
+    publicKeys = DataSerializer.readHashMap(in);
   }
 
   /** this will deserialize as an ArrayList */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index e10f325..3e9a2dc 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -86,4 +86,16 @@ public interface Messenger extends Service {
    * @param state the state of that member's outgoing messaging to this member
    */
   void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException;
+  
+  byte[] getPublickey(InternalDistributedMember mbr);
+  
+  void setPublicKey(byte[] publickey, InternalDistributedMember mbr);
+  
+  void setClusterSecretKey(byte[] clusterSecretKey);
+  
+  byte[] getClusterSecretKey();
+  
+  int getRequestId();
+  
+  void initClusterKey();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
index 5c0a1d1..c434c25 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
 
 public class FindCoordinatorRequest extends HighPriorityDistributionMessage
@@ -35,15 +36,20 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
   private InternalDistributedMember memberID;
   private Collection<InternalDistributedMember> rejectedCoordinators;
   private int lastViewId;
-  
+  private byte[] myPublicKey;
+  private int requestId;   
+
   public FindCoordinatorRequest(InternalDistributedMember myId) {
     this.memberID = myId;
   }
   
-  public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, int lastViewId) {
+  public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, 
+      int lastViewId, byte[] pk, int requestId) {
     this.memberID = myId;
     this.rejectedCoordinators = rejectedCoordinators;
     this.lastViewId = lastViewId;
+    this.myPublicKey = pk;
+    this.requestId = requestId;
   }
   
   public FindCoordinatorRequest() {
@@ -54,6 +60,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
     return memberID;
   }
   
+  public byte[] getMyPublicKey() {
+    return myPublicKey;
+  }
+
   public Collection<InternalDistributedMember> getRejectedCoordinators() {
     return rejectedCoordinators;
   }
@@ -81,6 +91,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
   public int getDSFID() {
     return FIND_COORDINATOR_REQ;
   }
+  
+  public int getRequestId() {
+    return requestId;
+  }
 
   @Override
   public void toData(DataOutput out) throws IOException {
@@ -94,6 +108,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
       out.writeInt(0);
     }
     out.writeInt(lastViewId);
+    out.writeInt(requestId);
+    InternalDataSerializer.writeByteArray(this.myPublicKey, out);
   }
 
   @Override
@@ -105,6 +121,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
       this.rejectedCoordinators.add((InternalDistributedMember)DataSerializer.readObject(in));
     }
     this.lastViewId = in.readInt();
+    this.requestId = in.readInt();
+    this.myPublicKey = InternalDataSerializer.readByteArray(in);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 0427cb4..07f0e58 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.locator;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -42,12 +43,14 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   private boolean networkPartitionDetectionEnabled;
   private boolean usePreferredCoordinators;
   private boolean isShortForm;
-  
+  private byte[] coordinatorPublicKey;  
+
+  private int requestId;
   
   public FindCoordinatorResponse(InternalDistributedMember coordinator,
       InternalDistributedMember senderId,
       boolean fromView, NetView view, HashSet<InternalDistributedMember> registrants,
-      boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators) {
+      boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators, byte[] pk) {
     this.coordinator = coordinator;
     this.senderId = senderId;
     this.fromView = fromView;
@@ -56,19 +59,30 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     this.networkPartitionDetectionEnabled = networkPartitionDectionEnabled;
     this.usePreferredCoordinators = usePreferredCoordinators;
     this.isShortForm = false;
+    this.coordinatorPublicKey = pk;
   }
   
   public FindCoordinatorResponse(InternalDistributedMember coordinator,
-      InternalDistributedMember senderId) {
+      InternalDistributedMember senderId, byte[] pk, int requestId) {
     this.coordinator = coordinator;
     this.senderId = senderId;
     this.isShortForm = true;
+    this.coordinatorPublicKey = pk;
+    this.requestId = requestId;
   }
   
   public FindCoordinatorResponse() {
     // no-arg constructor for serialization
   }
 
+  public byte[] getCoordinatorPublicKey() {
+    return coordinatorPublicKey;
+  }
+  
+  public int getRequestId() {
+    return requestId;
+  }
+  
   public boolean isNetworkPartitionDetectionEnabled() {
     return networkPartitionDetectionEnabled;
   }
@@ -131,6 +145,7 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   public void toData(DataOutput out) throws IOException {
     DataSerializer.writeObject(coordinator, out);
     DataSerializer.writeObject(senderId, out);
+    InternalDataSerializer.writeByteArray(coordinatorPublicKey, out);
     out.writeBoolean(isShortForm);
     out.writeBoolean(fromView);
     out.writeBoolean(networkPartitionDetectionEnabled);
@@ -143,7 +158,8 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     coordinator = DataSerializer.readObject(in);
     senderId = DataSerializer.readObject(in);
-    isShortForm = in.readBoolean();
+    coordinatorPublicKey = InternalDataSerializer.readByteArray(in);
+    isShortForm = in.readBoolean();    
     if (!isShortForm) {
       fromView = in.readBoolean();
       networkPartitionDetectionEnabled = in.readBoolean();
@@ -158,4 +174,50 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     throw new IllegalStateException("this message should not be executed");
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    FindCoordinatorResponse other = (FindCoordinatorResponse) obj;
+    if (coordinator == null) {
+      if (other.coordinator != null)
+        return false;
+    } else if (!coordinator.equals(other.coordinator))
+      return false;
+    if (!Arrays.equals(coordinatorPublicKey, other.coordinatorPublicKey))
+      return false;
+    if (fromView != other.fromView)
+      return false;
+    if (isShortForm != other.isShortForm)
+      return false;
+    if (networkPartitionDetectionEnabled != other.networkPartitionDetectionEnabled)
+      return false;
+    if (registrants == null) {
+      if (other.registrants != null)
+        return false;
+    } else if (!registrants.equals(other.registrants))
+      return false;
+    //as we are not sending requestId as part of FinDCoordinator resposne
+    /*if (requestId != other.requestId)
+      return false;*/
+    if (senderId == null) {
+      if (other.senderId != null)
+        return false;
+    } else if (!senderId.equals(other.senderId))
+      return false;
+    if (usePreferredCoordinators != other.usePreferredCoordinators)
+      return false;
+    if (view == null) {
+      if (other.view != null)
+        return false;
+    } else if (!view.equals(other.view))
+      return false;
+    return true;
+  }
+
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index aab9002..402940b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -167,7 +167,7 @@ public class GMSLocator implements Locator, NetLocator {
       }
     } else if (request instanceof FindCoordinatorRequest) {
       FindCoordinatorRequest findRequest = (FindCoordinatorRequest)request;
-      
+      services.getMessenger().setPublicKey(findRequest.getMyPublicKey(), findRequest.getMemberID());
       if (findRequest.getMemberID() != null) {
         InternalDistributedMember coord = null;
 
@@ -228,9 +228,17 @@ public class GMSLocator implements Locator, NetLocator {
         }
         
         synchronized(registrants) {
+          byte[] coordPk = null; 
+          if(view != null) {
+            coordPk = (byte[])view.getPublicKey(coord);            
+          }
+          if (coordPk == null) {
+            coordPk = services.getMessenger().getPublickey(coord);
+          }
           response = new FindCoordinatorResponse(coord, localAddress,
               fromView, view, new HashSet<InternalDistributedMember>(registrants),
-              this.networkPartitionDetectionEnabled, this.usePreferredCoordinators);
+              this.networkPartitionDetectionEnabled, this.usePreferredCoordinators, 
+              coordPk);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index d70884a..3cd634a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -39,6 +39,7 @@ import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
+
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
@@ -361,8 +362,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     } else {
       logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
       int port = services.getHealthMonitor().getFailureDetectionPort();
-      JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port);
-      services.getMessenger().send(req, state.view);
+      JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port, 
+          services.getMessenger().getRequestId());
+      //services.getMessenger().send(req, state.view);
+      services.getMessenger().send(req);
     }
 
     JoinResponseMessage response = null;
@@ -414,6 +417,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         NetView v = response.getCurrentView();
         InternalDistributedMember coord = v.getCoordinator();
         if (searchState.alreadyTried.contains(coord)) {
+          searchState.view = response.getCurrentView();
           // we already sent join request to it..so lets wait some more time here
           // assuming we got this response immediately, so wait for same timeout here..
           long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5);
@@ -421,7 +425,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           response = joinResponse[0];
         } else {
           // try on this coordinator
-          searchState.possibleCoordinator = coord;
+          searchState.view = response.getCurrentView();
           response = null;
         }
         searchState.view = v;
@@ -467,7 +471,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
       logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID());
-      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version", incomingRequest.getRequestId());
       m.setRecipient(incomingRequest.getMemberID());
       services.getMessenger().send(m);
       return;
@@ -480,7 +484,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       rejection = e.getMessage();
     }
     if (rejection != null && rejection.length() > 0) {
-      JoinResponseMessage m = new JoinResponseMessage(rejection);
+      JoinResponseMessage m = new JoinResponseMessage(rejection, 0);
       m.setRecipient(incomingRequest.getMemberID());
       services.getMessenger().send(m);
       return;
@@ -632,13 +636,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     logger.debug("JoinLeave is recording the request to be processed in the next membership view");
     synchronized (viewRequests) {
       viewRequests.add(request);
+      if (viewCreator != null) {
+        boolean joinResponseSent = viewCreator.informToPendingJoinRequests();
+
+        if (!joinResponseSent && request instanceof JoinRequestMessage) {
+          JoinRequestMessage jreq = (JoinRequestMessage) request;
+          // this will inform about cluster-secret key, as we have authenticated at this point
+          JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(), services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
+          services.getMessenger().send(response);
+        }
+      }
       viewRequests.notifyAll();
     }
-    if (viewCreator != null) {
-      viewCreator.informToPendingJoinRequests();
-    }
+    
   }
-
+  
   // for testing purposes, returns a copy of the view requests for verification
   List<DistributionMessage> getViewRequests() {
     synchronized (viewRequests) {
@@ -710,6 +722,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       if (newView != null) {
         viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers());
       }
+      services.getMessenger().initClusterKey();
       viewCreator.setDaemon(true);
       logger.info("ViewCreator starting on:" + localAddress);
       viewCreator.start();
@@ -752,7 +765,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void sendJoinResponses(NetView newView, List<InternalDistributedMember> newMbrs) {
     for (InternalDistributedMember mbr : newMbrs) {
-      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+      JoinResponseMessage response = new JoinResponseMessage(mbr, newView, 0);
       services.getMessenger().send(response);
     }
   }
@@ -770,14 +783,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
-  private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
-    for (InternalDistributedMember mbr : newMbrs) {
-      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
-      services.getMessenger().send(response);
-    }
-  }
-
-
   boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
     return sendView(view, newMembers, true, this.prepareProcessor);
   }
@@ -841,7 +846,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     pendingRemovals.removeAll(view.getCrashedMembers());
     viewReplyProcessor.initialize(id, responders);
     viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
-    services.getMessenger().send(msg, view);
+    addPublickeysToView(view);
+    services.getMessenger().send(msg);
 
     // only wait for responses during preparation
     if (preparing) {
@@ -868,13 +874,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     return true;
   }
 
+  private void addPublickeysToView(NetView view) {
+    //TODO: is this check is correct
+    if (services != null && services.getConfig() != null && services.getConfig().getDistributionConfig() != null) {
+      String sDHAlgo = services.getConfig().getDistributionConfig().getSecurityClientDHAlgo();
+      if (sDHAlgo != null && !sDHAlgo.isEmpty()) {
+        List<InternalDistributedMember> mbrs = view.getMembers();
+        Iterator<InternalDistributedMember> itr = mbrs.iterator();
+
+        while (itr.hasNext()) {
+          InternalDistributedMember mbr = itr.next();
+          byte[] pk = services.getMessenger().getPublickey(mbr);
+          view.setPublicKey(mbr, pk);
+        }
+      }
+    }
+  }
   private void processViewMessage(final InstallViewMessage m) {
 
     NetView view = m.getView();
     
     if(currentView != null && !currentView.contains(m.getSender())) {
-      logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
-      return;
+      if(this.preparedView == null || !this.preparedView.contains(m.getSender())) 
+      { 
+        logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
+        return;
+      }
     }
 
     if (currentView != null && view.getViewId() < currentView.getViewId()) {
@@ -927,7 +952,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void ackView(InstallViewMessage m) {
     if (!playingDead && m.getView().contains(m.getView().getCreator())) {
-      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()), m.getView());
+      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
     }
   }
 
@@ -968,7 +993,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       return findCoordinatorFromView();
     }
 
-    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
+    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId, 
+        services.getMessenger().getPublickey(localAddress), services.getMessenger().getRequestId());
     Set<InternalDistributedMember> possibleCoordinators = new HashSet<InternalDistributedMember>();
     Set<InternalDistributedMember> coordinatorsWithView = new HashSet<InternalDistributedMember>();
 
@@ -988,6 +1014,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout);
           FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null;
           if (response != null) {
+            setCoordinatorPublicKey(response);
             state.locatorsContacted++;
             if (!state.hasContactedAJoinedLocator &&
                 response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
@@ -1084,16 +1111,35 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (state.registrants != null) {
       recipients.addAll(state.registrants);
     }
-    recipients.remove(localAddress);
-    FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId);
-    req.setRecipients(v.getMembers());
+    recipients.remove(localAddress);    
 
+   // FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+     //   localAddress), services.getMessenger().getRequestId());
+    //req.setRecipients(v.getMembers());
+    
     boolean testing = unitTesting.contains("findCoordinatorFromView");
     synchronized (state.responses) {
       if (!testing) {
         state.responses.clear();
       }
-      services.getMessenger().send(req);
+      
+      if (!services.getConfig().getDistributionConfig().getSecurityClientDHAlgo().isEmpty()) {
+        for (InternalDistributedMember mbr : v.getMembers()) {
+          Set<InternalDistributedMember> r = new HashSet<>();
+          r.add(mbr);
+          FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+              localAddress), services.getMessenger().getRequestId());
+          req.setRecipients(r);
+
+          services.getMessenger().send(req, v);
+        }
+      } else {
+        FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+            localAddress), services.getMessenger().getRequestId());
+        req.setRecipients(v.getMembers());
+
+        services.getMessenger().send(req, v);
+      }
       try {
         if (!testing) {
           state.responses.wait(DISCOVERY_TIMEOUT);
@@ -1144,8 +1190,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   private void processJoinResponse(JoinResponseMessage rsp) {
     synchronized (joinResponse) {
       if (!this.isJoined) {
-        joinResponse[0] = rsp;
-        joinResponse.notifyAll();
+        //1. our joinRequest rejected.
+        //2. Member which was coordinator but just now some other member became coordinator
+        //3. we got message with secret key, but still view is coming and that will inform the joining thread
+        if (rsp.getRejectionMessage() != null || rsp.getCurrentView() != null) {
+          joinResponse[0] = rsp;
+          joinResponse.notifyAll();
+        } else {
+          //we got secret key lets add it
+          services.getMessenger().setClusterSecretKey(rsp.getSecretPk());
+        }
       }
     }
   }
@@ -1170,9 +1224,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     FindCoordinatorResponse resp;
     if (this.isJoined) {
       NetView v = currentView;
-      resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress);
+      resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress, 
+          services.getMessenger().getPublickey(v.getCoordinator()), req.getRequestId());
     } else {
-      resp = new FindCoordinatorResponse(localAddress, localAddress);
+      resp = new FindCoordinatorResponse(localAddress, localAddress, 
+          services.getMessenger().getPublickey(localAddress), req.getRequestId());
     }
     resp.setRecipient(req.getMemberID());
     services.getMessenger().send(resp);
@@ -1183,6 +1239,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       searchState.responses.add(resp);
       searchState.responses.notifyAll();
     }
+    setCoordinatorPublicKey(resp);
+  }
+  
+  private void setCoordinatorPublicKey(FindCoordinatorResponse response) {
+    if (response.getCoordinator() != null && response.getCoordinatorPublicKey() != null)
+      services.getMessenger().setPublicKey(response.getCoordinatorPublicKey(), response.getCoordinator());
   }
 
   private void processNetworkPartitionMessage(NetworkPartitionMessage msg) {
@@ -1993,36 +2055,40 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     }
     
-    synchronized void informToPendingJoinRequests() {
+    synchronized boolean informToPendingJoinRequests() {
+      boolean joinResponseSent = false;
       if (!shutdown) {
-        return;
+        return joinResponseSent;
       }
-
       ArrayList<DistributionMessage> requests = new ArrayList<>();
       synchronized (viewRequests) {
         if (viewRequests.size() > 0) {
           requests.addAll(viewRequests);
         } else {
-          return;
+          return joinResponseSent;
         }
         viewRequests.clear();
       }
 
+      
       for (DistributionMessage msg : requests) {
         switch (msg.getDSFID()) {
         case JOIN_REQUEST:
-          logger.info("Informing to pending join requests {}", msg);
-
+      
           NetView v = currentView;
+          logger.info("Informing to pending join requests {} myid {} coord {}", msg, localAddress, v.getCoordinator());
           if (!v.getCoordinator().equals(localAddress)) {
+            joinResponseSent = true;
             //lets inform that coordinator has been changed
-            JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v);
+            JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v, ((JoinRequestMessage) msg).getRequestId());
             services.getMessenger().send(jrm);
           }
         default:
           break;
         }
       }
+      
+      return joinResponseSent;
     }
 
     /**
@@ -2033,7 +2099,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     void createAndSendView(List<DistributionMessage> requests) throws InterruptedException {
       List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
       Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
-      Map<InternalDistributedMember, Object> joinKeys = new HashMap<>(10);
       Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
       List<InternalDistributedMember> removalReqs = new ArrayList<>(10);
       List<String> removalReasons = new ArrayList<String>(10);
@@ -2067,7 +2132,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           if (!joinReqs.contains(mbr)) {
             joinReqs.add(mbr);
             joinPorts.put(mbr, port);
-            joinKeys.put(mbr, jmsg.getPublicKey());
           }
           break;
         case LEAVE_REQUEST_MESSAGE:
@@ -2137,7 +2201,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         for (InternalDistributedMember mbr : joinReqs) {
           if (mbrs.contains(mbr)) {
             newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
-            newView.setPublicKey(mbr, joinKeys.get(mbr));
           }
         }
         if (currentView != null) {
@@ -2161,7 +2224,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         return;
       }
 
-      sendJoinResponses(joinReqs, newView);
+      //we already sent whrn we got join request
+      //sendJoinResponses(newView, joinReqs);
 
       // send removal messages before installing the view so we stop
       // getting messages from members that have been kicked out
@@ -2289,7 +2353,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
       // we also send a join response so that information like the multicast message digest
       // can be transmitted to the new members w/o including it in the view message
-      sendJoinResponses(newView, joinReqs);
+      //we already sent whrn we got join request
+      //sendJoinResponses(newView, joinReqs);
 
       if (markViewCreatorForShutdown && getViewCreator() != null) {
         shutdown = true;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index c41584f..224fef1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -110,4 +110,29 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
              +")";
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InstallViewMessage other = (InstallViewMessage) obj;
+    if (credentials == null) {
+      if (other.credentials != null)
+        return false;
+    } else if (!credentials.equals(other.credentials))
+      return false;
+    if (kind != other.kind)
+      return false;
+    if (previousViewId != other.previousViewId)
+      return false;
+    if (view == null) {
+      if (other.view != null)
+        return false;
+    } else if (!view.equals(other.view))
+      return false;
+    return true;
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 5545935..b282daa 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,25 +30,25 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   private InternalDistributedMember memberID;
   private Object credentials;
   private int failureDetectionPort = -1;
-  private Object publicKey;
-  
+  private int requestId;
+    
   public JoinRequestMessage(InternalDistributedMember coord,
-                            InternalDistributedMember id, Object credentials, int fdPort) {
+                            InternalDistributedMember id, Object credentials, int fdPort, int requestId) {
     super();
     setRecipient(coord);
     this.memberID = id;
     this.credentials = credentials;
-    this.publicKey = null;
     this.failureDetectionPort = fdPort;
+    this.requestId = requestId;
   }
   public JoinRequestMessage() {
     // no-arg constructor for serialization
   }
 
-  public void setPublicKey(Object key) {
-    this.publicKey = key;
+  public int getRequestId() {
+    return requestId;
   }
-
+  
   @Override
   public int getDSFID() {
     return JOIN_REQUEST;
@@ -67,10 +67,6 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
     return credentials;
   }
 
-  public Object getPublicKey() {
-    return publicKey;
-  }
-  
   @Override
   public String toString() {
     return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " failureDetectionPort:" + failureDetectionPort;
@@ -85,24 +81,49 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   public void toData(DataOutput out) throws IOException {
     DataSerializer.writeObject(memberID, out);
     DataSerializer.writeObject(credentials, out);
-    DataSerializer.writeObject(publicKey, out);
     DataSerializer.writePrimitiveInt(failureDetectionPort, out);
     // preserve the multicast setting so the receiver can tell
     // if this is a mcast join request
     out.writeBoolean(getMulticast());
+    out.writeInt(requestId);
   }
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     memberID = DataSerializer.readObject(in);
     credentials = DataSerializer.readObject(in);
-    publicKey = DataSerializer.readObject(in);
     failureDetectionPort = DataSerializer.readPrimitiveInt(in);
     setMulticast(in.readBoolean());
+    requestId = in.readInt();
   }
 
   public int getFailureDetectionPort() {
     return failureDetectionPort;
   }
-
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JoinRequestMessage other = (JoinRequestMessage) obj;
+    if (credentials == null) {
+      if (other.credentials != null)
+        return false;
+    } else if (!credentials.equals(other.credentials))
+      return false;
+    if (failureDetectionPort != other.failureDetectionPort)
+      return false;
+    if (memberID == null) {
+      if (other.memberID != null)
+        return false;
+    } else if (!memberID.equals(other.memberID))
+      return false;
+    if (requestId != other.requestId)
+      return false;
+    return true;
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index ad9c319..ff20a4e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,20 +38,39 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
   private String rejectionMessage;
   private InternalDistributedMember memberID;
   private byte[] messengerData;
-  
-  public JoinResponseMessage(InternalDistributedMember memberID, NetView view) {
+  private int requestId;
+  private byte[] secretPk;
+    
+  public JoinResponseMessage(InternalDistributedMember memberID, NetView view, int requestId) {
     this.currentView = view;
     this.memberID = memberID;
+    this.requestId = requestId;
     setRecipient(memberID);
   }
   
-  public JoinResponseMessage(String rejectionMessage) {
+  public JoinResponseMessage(InternalDistributedMember memberID, byte[] sPk, int requestId) {
+    this.memberID = memberID;
+    this.requestId = requestId;
+    this.secretPk = sPk;
+    setRecipient(memberID);
+  }
+  
+  public JoinResponseMessage(String rejectionMessage, int requestId) {
     this.rejectionMessage = rejectionMessage;
+    this.requestId = requestId;
   }
   
   public JoinResponseMessage() {
     // no-arg constructor for serialization
   }
+  
+  public byte[] getSecretPk() {
+    return secretPk;
+  }
+  
+  public int getRequestId() {
+    return requestId;
+  }
 
   public NetView getCurrentView() {
     return currentView;
@@ -101,6 +121,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
     DataSerializer.writeObject(memberID, out);
     DataSerializer.writeString(rejectionMessage, out);
     DataSerializer.writeByteArray(messengerData, out);
+    DataSerializer.writeByteArray(secretPk, out);
   }
 
   @Override
@@ -109,6 +130,42 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
     memberID = DataSerializer.readObject(in);
     rejectionMessage = DataSerializer.readString(in);
     messengerData = DataSerializer.readByteArray(in);
+    secretPk = DataSerializer.readByteArray(in);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JoinResponseMessage other = (JoinResponseMessage) obj;
+    if (currentView == null) {
+      if (other.currentView != null)
+        return false;
+    } else if (!currentView.equals(other.currentView))
+      return false;
+    if (memberID == null) {
+      if (other.memberID != null)
+        return false;
+    } else if (!memberID.equals(other.memberID))
+      return false;
+    if (!Arrays.equals(messengerData, other.messengerData))
+      return false;
+    if (rejectionMessage == null) {
+      if (other.rejectionMessage != null)
+        return false;
+    } else if (!rejectionMessage.equals(other.rejectionMessage))
+      return false;
+    //as we are not sending as part of JoinResposne
+    /*if (requestId != other.requestId)
+      return false;*/
+    if (!Arrays.equals(secretPk, other.secretPk))
+      return false;
+    return true;
   }
 
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index f307290..047bb03 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -23,7 +23,6 @@ import java.security.spec.X509EncodedKeySpec;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.crypto.Cipher;
@@ -37,18 +36,14 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.distributed.internal.membership.NetView;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 
-import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.logging.LogService;
 
 public class GMSEncrypt implements Cloneable {
 
   public static long encodingsPerformed;
   public static long decodingsPerformed;
 
-  private static final Logger logger = LogService.getLogger();
-
   // Parameters for the Diffie-Hellman key exchange
   private static final BigInteger dhP = new BigInteger("13528702063991073999718992897071702177131142188276542919088770094024269" + "73079899070080419278066109785292538223079165925365098181867673946"
       + "34756714063947534092593553024224277712367371302394452615862654308" + "11180902979719649450105660478776364198726078338308557022096810447" + "3500348898008043285865193451061481841186553");
@@ -74,27 +69,27 @@ public class GMSEncrypt implements Cloneable {
 
   private ClusterEncryptor clusterEncryptor;
 
-  protected void installView(NetView view) throws Exception {
+  protected void installView(NetView view) {
     this.view = view;
     this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes());
   }
 
-  protected void installView(NetView view, InternalDistributedMember mbr) throws Exception {
+  protected void installView(NetView view, InternalDistributedMember mbr) {
     this.view = view;
-    // this.view.setPublicKey(mbr, getPublicKeyBytes());
-    // TODO remove ciphers for departed members
-    // addClusterKey();
   }
 
-  protected byte[] getSecretBytes() {
+  protected byte[] getClusterSecretKey() {
     return this.clusterEncryptor.secretBytes;
   }
 
-  protected synchronized void addClusterKey() throws Exception {
-    this.clusterEncryptor = new ClusterEncryptor(this);
+  protected synchronized void initClusterSecretKey() throws Exception {
+    if(this.clusterEncryptor == null) {
+      this.clusterEncryptor = new ClusterEncryptor(this);
+    }
   }
 
-  protected synchronized void addClusterKey(byte[] secretBytes) throws Exception {
+  protected synchronized void addClusterKey(byte[] secretBytes) {
+    //TODO we are reseeting here, in case there is some race
     this.clusterEncryptor = new ClusterEncryptor(secretBytes);
   }
 
@@ -124,6 +119,11 @@ public class GMSEncrypt implements Cloneable {
   public byte[] decryptData(byte[] data) throws Exception {
     return this.clusterEncryptor.decryptBytes(data);
   }
+  
+  public byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception {
+    PeerEncryptor encryptor = new PeerEncryptor(pkBytes);
+    return encryptor.decryptBytes(data);    
+  }
 
   public byte[] encryptData(byte[] data) throws Exception {
     return this.clusterEncryptor.encryptBytes(data);
@@ -132,6 +132,26 @@ public class GMSEncrypt implements Cloneable {
   protected byte[] getPublicKeyBytes() {
     return dhPublicKey.getEncoded();
   }
+  
+  protected byte[] getPublicKey(InternalDistributedMember member) {
+    try {
+      InternalDistributedMember localMbr = services.getMessenger().getMemberID();
+      if (localMbr != null && localMbr.equals(member)) {
+        return this.dhPublicKey.getEncoded();// local one
+      }
+      return getPeerEncryptor(member).peerPublicKey.getEncoded();
+    } catch (Exception e) {
+      throw new RuntimeException("Not found public key for member " + member, e);
+    }
+  }
+  
+  protected void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+    try {
+      createPeerEncryptor(mbr, publickey);
+    }catch(Exception e) {
+      throw new RuntimeException("Unable to create peer encryptor " +  mbr, e);
+    }
+  }
 
   @Override
   protected GMSEncrypt clone() throws CloneNotSupportedException {
@@ -142,7 +162,6 @@ public class GMSEncrypt implements Cloneable {
 
       X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(this.dhPublicKey.getEncoded());
       KeyFactory keyFact = KeyFactory.getInstance("DH");
-      // PublicKey pubKey = keyFact.generatePublic(x509KeySpec);
       gmsEncrypt.dhPublicKey = keyFact.generatePublic(x509KeySpec);
       final String format = this.dhPrivateKey.getFormat();
       System.out.println("private key format " + format);
@@ -180,16 +199,20 @@ public class GMSEncrypt implements Cloneable {
     }
   }
 
-  protected synchronized PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
+  protected PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
     PeerEncryptor result = memberToPeerEncryptor.get(member);
     if (result == null) {
-      result = createPeerEncryptor(member);
+      synchronized (this) {
+        result = memberToPeerEncryptor.get(member);
+        if (result == null) {
+          result = createPeerEncryptor(member, (byte[]) view.getPublicKey(member));
+        }
+      }
     }
     return result;
   }
 
-  private PeerEncryptor createPeerEncryptor(InternalDistributedMember member) throws Exception {
-    byte[] peerKeyBytes = (byte[]) view.getPublicKey(member);
+  private PeerEncryptor createPeerEncryptor(InternalDistributedMember member, byte[] peerKeyBytes) throws Exception {
     PeerEncryptor result = new PeerEncryptor(peerKeyBytes);
     memberToPeerEncryptor.put(member, result);
     return result;
@@ -274,7 +297,7 @@ public class GMSEncrypt implements Cloneable {
       this.peerPublicKey = getPublicKey(peerPublicKeyBytes);
     }
 
-    public byte[] encryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] encryptBytes(byte[] data) throws Exception {
       String algo = null;
       if (this.peerSKAlgo != null) {
         algo = this.peerSKAlgo;
@@ -295,7 +318,7 @@ public class GMSEncrypt implements Cloneable {
       return encrypt;
     }
 
-    public byte[] decryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] decryptBytes(byte[] data) throws Exception {
       String algo = null;
       if (this.peerSKAlgo != null) {
         algo = this.peerSKAlgo;
@@ -352,7 +375,7 @@ public class GMSEncrypt implements Cloneable {
     int blocksize = getBlockSize(dhSKAlgo);
 
     if (keysize == -1 || blocksize == -1) {
-      // TODO how should we do here
+      // TODO how should we do here, should we just throw runtime exception?
       /* SecretKey sKey = ka.generateSecret(dhSKAlgo);
        * encrypt = Cipher.getInstance(dhSKAlgo);
        * encrypt.init(Cipher.ENCRYPT_MODE, sKey); */
@@ -403,7 +426,7 @@ public class GMSEncrypt implements Cloneable {
     int blocksize = getBlockSize(dhSKAlgo);
 
     if (keysize == -1 || blocksize == -1) {
-      // TODO: how to do here
+      // TODO: how to do here, should we just throw runtime exception?
       /* SecretKey sKey = ka.generateSecret(dhSKAlgo);
        * decrypt = Cipher.getInstance(dhSKAlgo);
        * decrypt.init(Cipher.DECRYPT_MODE, sKey); */
@@ -431,8 +454,6 @@ public class GMSEncrypt implements Cloneable {
       SecretKey sKey = ka.generateSecret(dhSKAlgo);
       return sKey.getEncoded();
     } else {
-      String algoStr = getDhAlgoStr(dhSKAlgo);
-
       return ka.generateSecret();
     }
   }
@@ -453,16 +474,13 @@ public class GMSEncrypt implements Cloneable {
 
   /***
    * this will hold the common key for cluster
-   * that will be created using publickey of all the members..
-   *
    */
   protected class ClusterEncryptor {
     byte[] secretBytes;
+    //TODO: need to look this is thread safe
     Cipher encrypt;
     Cipher decrypt;
-    int viewId;
-    Set<InternalDistributedMember> mbrs;
-
+   
     public ClusterEncryptor(GMSEncrypt other) throws Exception {
       GMSEncrypt mine = new GMSEncrypt(other.services);
       this.secretBytes = GMSEncrypt.generateSecret(mine.dhSKAlgo, mine.dhPrivateKey, other.dhPublicKey);
@@ -472,7 +490,7 @@ public class GMSEncrypt implements Cloneable {
       this.secretBytes = sb;
     }
 
-    public byte[] encryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] encryptBytes(byte[] data) throws Exception {
       String algo = dhSKAlgo;
       return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo));
     }
@@ -491,7 +509,7 @@ public class GMSEncrypt implements Cloneable {
       return encrypt;
     }
 
-    public byte[] decryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] decryptBytes(byte[] data) throws Exception {
       String algo = dhSKAlgo;
       Cipher c = getDecryptCipher(algo);
       return GMSEncrypt.decryptBytes(data, c);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index cba5d5f..eedfc5f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -19,10 +19,13 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -33,12 +36,15 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -80,10 +86,13 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.internal.ClassPathLoader;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.OSProcess;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.Version;
@@ -256,11 +265,11 @@ public class JGroupsMessenger implements Messenger {
     if ( !dc.getSecurityClientDHAlgo().isEmpty() ) {
       try {
         this.encrypt = new GMSEncrypt(services);
+        logger.info("Initializing GMSEncrypt ");
       } catch (Exception e) {
         throw new GemFireConfigException("problem initializing encryption protocol", e);
       }
     }
-
   }
 
   @Override
@@ -389,12 +398,7 @@ public class JGroupsMessenger implements Messenger {
     addressesWithIoExceptionsProcessed.clear();
 
     if (encrypt != null) {
-      try {
-        encrypt.installView(v);
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+      encrypt.installView(v);
     }
   }
   
@@ -607,20 +611,7 @@ public class JGroupsMessenger implements Messenger {
   public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
     return send(msg, false);
   }
-
-  @Override
-  public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
-    if (this.encrypt != null) {
-      try {
-        this.encrypt.installView(alternateView);
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-    }
-    return send(msg, true);
-  }
-
+    
   @Override
   public Set<InternalDistributedMember> send(DistributionMessage msg) {
     return send(msg, true);
@@ -670,7 +661,7 @@ public class JGroupsMessenger implements Messenger {
     if (useMcast) {
 
       long startSer = theStats.startMsgSerialization();
-      Message jmsg = createJGMessage(msg, local, null, Version.CURRENT_ORDINAL);
+      Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
       theStats.endMsgSerialization(startSer);
 
       Exception problem = null;
@@ -712,7 +703,7 @@ public class JGroupsMessenger implements Messenger {
     } // useMcast
     else { // ! useMcast
       int len = destinations.length;
-      List<InternalDistributedMember> calculatedMembers; // explicit list of members
+      List<GMSMember> calculatedMembers; // explicit list of members
       int calculatedLen; // == calculatedMembers.len
       if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
         // Grab a copy of the current membership
@@ -720,40 +711,51 @@ public class JGroupsMessenger implements Messenger {
 
         // Construct the list
         calculatedLen = v.size();
-        calculatedMembers = new LinkedList<InternalDistributedMember>();
+        calculatedMembers = new LinkedList<GMSMember>();
         for (int i = 0; i < calculatedLen; i ++) {
           InternalDistributedMember m = (InternalDistributedMember)v.get(i);
-          calculatedMembers.add(m);
+          calculatedMembers.add((GMSMember)m.getNetMember());
         }
       } // send to all
       else { // send to explicit list
         calculatedLen = len;
-        calculatedMembers = new LinkedList<>();
+        calculatedMembers = new LinkedList<GMSMember>();
         for (int i = 0; i < calculatedLen; i ++) {
-          calculatedMembers.add(destinations[i]);
+          calculatedMembers.add((GMSMember)destinations[i].getNetMember());
         }
       } // send to explicit list
       Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
       long startSer = theStats.startMsgSerialization();
-
-      boolean encode = (encrypt != null);
-
       boolean firstMessage = true;
+      for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
+        GMSMember mbr = it.next();
+        short version = mbr.getVersionOrdinal();
+        if ( !messages.containsKey(version) ) {
+          Message jmsg = createJGMessage(msg, local, version);
+          messages.put(version, jmsg);
+          if (firstMessage) {
+            theStats.incSentBytes(jmsg.getLength());
+            firstMessage = false;
+          }
+        }
+      }
+      theStats.endMsgSerialization(startSer);
       Collections.shuffle(calculatedMembers);
       int i=0;
-      for (InternalDistributedMember mbr: calculatedMembers) {
-        short version = mbr.getNetMember().getVersionOrdinal();
+      for (GMSMember mbr: calculatedMembers) {
         JGAddress to = new JGAddress(mbr);
-        Message jmsg = createJGMessage(msg, local, mbr, version);
+        short version = mbr.getVersionOrdinal();
+        Message jmsg = (Message)messages.get(version);
         Exception problem = null;
         try {
+          Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
           if (!reliably) {
             jmsg.setFlag(Message.Flag.NO_RELIABILITY);
           }
-          jmsg.setDest(to);
-          jmsg.setSrc(this.jgAddress);
+          tmp.setDest(to);
+          tmp.setSrc(this.jgAddress);
           logger.trace("Unicasting to {}", to);
-          myChannel.send(jmsg);
+          myChannel.send(tmp);
         }
         catch (Exception e) {
           problem = e;
@@ -810,7 +812,7 @@ public class JGroupsMessenger implements Messenger {
    * @param version the version of the recipient
    * @return the new message
    */
-  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, InternalDistributedMember recipient, short version) {
+  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) {
     if(gfmsg instanceof DirectReplyMessage) {
       ((DirectReplyMessage) gfmsg).registerProcessor();
     }
@@ -820,35 +822,17 @@ public class JGroupsMessenger implements Messenger {
     setMessageFlags(gfmsg, msg);
     try {
       long start = services.getStatistics().startMsgSerialization();
-      HeapDataOutputStream out_stream =
-        new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+      byte[] messageBytes = null;
+      HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
       Version.CURRENT.writeOrdinal(out_stream, true);
-      DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
-      boolean encode = encrypt != null && recipient != null;
-      if (encode) {
-        // Coordinator doesn't know our publicKey for a JoinRequest
-        if (gfmsg.getDSFID() == JOIN_REQUEST || gfmsg.getDSFID() == JOIN_RESPONSE) {
-          encode = false;
-        }
-      }
-      if (encode) {
-        logger.info("encoding {}", gfmsg);
-        try {
-          out_stream.writeBoolean(true); // TODO we should have flag bits
-          HeapDataOutputStream out_stream2 =
-            new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
-          DataSerializer.writeObject(gfmsg, out_stream2);
-          byte[] payload = out_stream2.toByteArray();
-          payload = encrypt.encryptData(payload, recipient);
-          DataSerializer.writeByteArray(payload, out_stream);
-        } catch (Exception e) {
-          throw new GemFireIOException("unable to send message", e);
-        }
+      if(encrypt != null) {
+        out_stream.writeBoolean(true);
+        writeEncryptedMessage(gfmsg, version, out_stream);                
       } else {
-        logger.info("not encoding {}", gfmsg);
         out_stream.writeBoolean(false);
-        DataSerializer.writeObject(gfmsg, out_stream);
+        serializeMessage(gfmsg, out_stream);
       }
+      
       msg.setBuffer(out_stream.toByteArray());
       services.getStatistics().endMsgSerialization(start);
     }
@@ -862,9 +846,82 @@ public class JGroupsMessenger implements Messenger {
         ioe.initCause(ex);
         throw ioe;
       }
+    } catch(Exception ex){
+      logger.warn("Error serializing message", ex);
+      GemFireIOException ioe = new
+          GemFireIOException("Error serializing message");
+        ioe.initCause(ex.getCause());
+        throw ioe;
     }
     return msg;
   }
+  
+  void writeEncryptedMessage(DistributionMessage gfmsg, short version, HeapDataOutputStream out) throws Exception {
+    InternalDataSerializer.writeDSFIDHeader(gfmsg.getDSFID(), out);
+    byte[] pk = null;
+    int requestId = 0;
+    InternalDistributedMember pkMbr = null;
+    switch (gfmsg.getDSFID()) {
+    case FIND_COORDINATOR_REQ:
+    case JOIN_REQUEST:
+      //need to append mine PK
+      pk = encrypt.getPublicKey(localAddress);
+      
+      pkMbr = gfmsg.getRecipients()[0];      
+      requestId = getRequestId(gfmsg, true);
+      break;
+    case FIND_COORDINATOR_RESP:
+    case JOIN_RESPONSE:
+      pkMbr = gfmsg.getRecipients()[0];
+      requestId = getRequestId(gfmsg, false);
+    default:
+      break;
+    }
+    logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {}  for {} with requestid  {}", gfmsg.getDSFID(), pkMbr, requestId);
+    out.writeInt(requestId);
+    if (pk != null) {      
+      InternalDataSerializer.writeByteArray(pk, out);
+    }
+
+    HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+    byte[] messageBytes = serializeMessage(gfmsg, out_stream);
+    
+    if (pkMbr != null) {
+      // using members private key
+      messageBytes = encrypt.encryptData(messageBytes, pkMbr);
+    } else {
+      // using cluster secret key
+      messageBytes = encrypt.encryptData(messageBytes);
+    }
+    InternalDataSerializer.writeByteArray(messageBytes, out);
+  }
+  
+  int getRequestId(DistributionMessage gfmsg, boolean add) {
+    int requestId = 0;
+    if (gfmsg instanceof FindCoordinatorRequest) {
+      requestId = ((FindCoordinatorRequest) gfmsg).getRequestId();
+    } else if (gfmsg instanceof JoinRequestMessage) {
+      requestId = ((JoinRequestMessage) gfmsg).getRequestId();
+    } else if (gfmsg instanceof FindCoordinatorResponse) {
+      requestId = ((FindCoordinatorResponse) gfmsg).getRequestId();
+    } else if (gfmsg instanceof JoinResponseMessage) {
+      requestId = ((JoinResponseMessage) gfmsg).getRequestId();
+    }
+
+    if (add) {
+      addRequestId(requestId, gfmsg.getRecipients()[0]);
+    }
+
+    return requestId;
+  }
+  
+  byte[] serializeMessage(DistributionMessage gfmsg, HeapDataOutputStream out_stream) throws IOException {
+    
+    DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
+    DataSerializer.writeObject(gfmsg, out_stream);
+    
+    return out_stream.toByteArray();
+  }
 
   void setMessageFlags(DistributionMessage gfmsg, Message msg) {
     // GemFire uses its own reply processors so there is no need
@@ -905,17 +962,14 @@ public class JGroupsMessenger implements Messenger {
       // as STABLE_GOSSIP
       logger.trace("message length is zero - ignoring");
       return null;
-    }
-
-    InternalDistributedMember sender = null;
+    }    
 
     Exception problem = null;
     byte[] buf = jgmsg.getRawBuffer();
     try {
       long start = services.getStatistics().startMsgDeserialization();
       
-
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
           jgmsg.getOffset(), jgmsg.getLength()));
 
       short ordinal = Version.readOrdinal(dis);
@@ -924,44 +978,33 @@ public class JGroupsMessenger implements Messenger {
         dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
             ordinal, true));
       }
-
-      GMSMember m = DataSerializer.readObject(dis);
-
-      sender = getMemberFromView(m, ordinal);
-
-      boolean encrypted = dis.readBoolean();
-
-      if (encrypted && encrypt != null) {
-        byte[] payload = DataSerializer.readByteArray(dis);
-        try {
-          payload = encrypt.decryptData(payload, sender);
-          dis = new DataInputStream(new ByteArrayInputStream(payload));
-          if (ordinal < Version.CURRENT_ORDINAL) {
-            dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
-              ordinal, true));
-          }
-        } catch (Exception e) {
-          throw new GemFireIOException("unable to receive message", e);
-        }
-      }
-
-      result = DataSerializer.readObject(dis);
-
-      DistributionMessage dm = (DistributionMessage)result;
+    
+      //read
+      boolean isEncrypted = dis.readBoolean();
+      
+      if(isEncrypted && encrypt == null) {
+        throw new GemFireConfigException("Got remote message as encrypted");
+      } 
       
-      // JoinRequestMessages are sent with an ID that may have been
-      // reused from a previous life by way of auto-reconnect,
-      // so we don't want to find a canonical reference for the
-      // request's sender ID
-      if (dm.getDSFID() == JOIN_REQUEST) {
-        sender = ((JoinRequestMessage)dm).getMemberID();
+      if(isEncrypted) {
+        result = readEncryptedMessage(dis, ordinal, encrypt);
+      } else {
+        GMSMember m = DataSerializer.readObject(dis);
+  
+        result = DataSerializer.readObject(dis);
+  
+        DistributionMessage dm = (DistributionMessage)result;
+        
+        setSender(dm, m, ordinal);
       }
-      ((DistributionMessage)result).setSender(sender);
+      
       
       services.getStatistics().endMsgDeserialization(start);
     }
     catch (ClassNotFoundException | IOException | RuntimeException e) {
       problem = e;
+    } catch(Exception e) {
+      problem = e;
     }
     if (problem != null) {
       logger.error(LocalizedMessage.create(
@@ -972,37 +1015,113 @@ public class JGroupsMessenger implements Messenger {
     return result;
   }
   
-  
-  /** look for certain messages that may need to be altered before being sent */
-  void filterOutgoingMessage(DistributionMessage m) {
-    switch (m.getDSFID()) {
+  void setSender(DistributionMessage dm, GMSMember m, short ordinal) {
+    InternalDistributedMember sender = null;
+    // JoinRequestMessages are sent with an ID that may have been
+    // reused from a previous life by way of auto-reconnect,
+    // so we don't want to find a canonical reference for the
+    // request's sender ID
+    if (dm.getDSFID() == JOIN_REQUEST) {
+      sender = ((JoinRequestMessage)dm).getMemberID();
+    } else {
+      sender = getMemberFromView(m, ordinal);
+    }
+    dm.setSender(sender);
+  }
+
+  @SuppressWarnings("resource")
+  DistributionMessage readEncryptedMessage(DataInputStream dis, short ordinal, GMSEncrypt encryptLocal) throws Exception {
+    int dfsid = InternalDataSerializer.readDSFIDHeader(dis);
+    int requestId = dis.readInt();
+
+    try {
+      // TODO seems like we don't need this, just set bit that PK is appended
+
+      logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is " + requestId + " myid " + this.localAddress);
+      InternalDistributedMember pkMbr = null;
+      boolean readPK = false;
+      switch (dfsid) {
+      case FIND_COORDINATOR_REQ:
       case JOIN_REQUEST:
-        if (encrypt == null) {
-          break;
-        }
-        JoinRequestMessage joinMsg = (JoinRequestMessage)m;
-        joinMsg.setPublicKey(encrypt.getPublicKeyBytes());
+        readPK = true;
         break;
-
+      case FIND_COORDINATOR_RESP:
       case JOIN_RESPONSE:
-        JoinResponseMessage jrsp = (JoinResponseMessage)m;
+        // this will have requestId to know the PK
+        pkMbr = getRequestedMember(requestId);
+        break;
+      }
+
+      byte[] data;
+
+      byte[] pk = null;
+
+      if (readPK) {
+        // need to read PK
+        pk = InternalDataSerializer.readByteArray(dis);
+        // encrypt.setPublicKey(publickey, mbr);
+        data = InternalDataSerializer.readByteArray(dis);
+        // using prefixed pk from sender
+        data = encryptLocal.decryptData(data, pk);
+      } else {
+        data = InternalDataSerializer.readByteArray(dis);
+        // from cluster key
+        if (pkMbr != null) {
+          // using member public key
+          data = encryptLocal.decryptData(data, pkMbr);
+        } else {
+          // from cluster key
+          data = encryptLocal.decryptData(data);
+        }
+      }
+
+      {
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+
+        if (ordinal < Version.CURRENT_ORDINAL) {
+          in = new VersionedDataInputStream(in, Version.fromOrdinalNoThrow(ordinal, true));
+        }
+
+        GMSMember m = DataSerializer.readObject(in);
+
+        DistributionMessage result = (DistributionMessage) DataSerializer.readObject(in);
+
+        setSender(result, m, ordinal);
 
-        if (jrsp.getRejectionMessage() == null
+        if (pk != null) {
+          encryptLocal.setPublicKey(pk, result.getSender());
+        }
+
+        return result;
+      }
+    } catch (Exception e) {
+      throw new Exception("Message id is " + dfsid, e);
+    }
+
+  }
+  
+  /** look for certain messages that may need to be altered before being sent */
+  void filterOutgoingMessage(DistributionMessage m) {
+    switch (m.getDSFID()) {
+    case JOIN_RESPONSE:
+      JoinResponseMessage jrsp = (JoinResponseMessage)m;
+      
+      if (jrsp.getRejectionMessage() == null
           &&  services.getConfig().getTransport().isMcastEnabled()) {
-          // get the multicast message digest and pass it with the join response
-          Digest digest = (Digest)this.myChannel.getProtocolStack()
+        // get the multicast message digest and pass it with the join response
+        Digest digest = (Digest)this.myChannel.getProtocolStack()
             .getTopProtocol().down(Event.GET_DIGEST_EVT);
-          HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
-          try {
-            digest.writeTo(hdos);
-          } catch (Exception e) {
-            logger.fatal("Unable to serialize JGroups messaging digest", e);
-          }
-          jrsp.setMessengerData(hdos.toByteArray());
+        HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
+        try {
+          digest.writeTo(hdos);
+        } catch (Exception e) {
+          logger.fatal("Unable to serialize JGroups messaging digest", e);
         }
-        break;
-      default:
-        break;
+        jrsp.setMessengerData(hdos.toByteArray());
+      }
+      break;
+    default:
+      break;
     }
   }
   
@@ -1190,7 +1309,7 @@ public class JGroupsMessenger implements Messenger {
           if (clazz.isAssignableFrom(msgClazz)) {
             h = handlers.get(clazz);
             handlers.put(msg.getClass(), h);
-            break;
+            break;              
           }
         }
       }
@@ -1201,6 +1320,72 @@ public class JGroupsMessenger implements Messenger {
     }
   }
   
+  @Override
+  public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
+    if (this.encrypt != null) {
+      this.encrypt.installView(alternateView);      
+    }
+    return send(msg, true);
+  }
+
+  @Override
+  public byte[] getPublickey(InternalDistributedMember mbr) {
+    if (encrypt != null) {
+      return encrypt.getPublicKey(mbr);
+    }
+    return null;
+  }
+
+  @Override
+  public void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+    if (encrypt != null) {
+      logger.debug("Setting pK for member " + mbr);
+      encrypt.setPublicKey(publickey, mbr);
+    }
+  }
+
+  @Override
+  public void setClusterSecretKey(byte[] clusterSecretKey) {
+    if (encrypt != null) {
+      logger.debug("Setting cluster key");
+      encrypt.addClusterKey(clusterSecretKey);
+    }
+  }
+
+  @Override
+  public byte[] getClusterSecretKey() {
+    if (encrypt != null) {
+      return encrypt.getClusterSecretKey();
+    }
+    return null;
+  }
+
+  private Random randomId = new Random();
+  private HashMap<Integer, InternalDistributedMember> requestIdVsRecipients = new HashMap<>();
   
+  InternalDistributedMember getRequestedMember(int requestId) {
+    //TODO: what if we don't get response, need to remove this otherwise it will be leak
+    return requestIdVsRecipients.remove(requestId);
+  }
+  
+  void addRequestId(int requestId, InternalDistributedMember mbr) {
+    requestIdVsRecipients.put(requestId, mbr);
+  }
   
+  @Override
+  public int getRequestId() {
+    return randomId.nextInt();
+  }
+
+  @Override
+  public void initClusterKey() {
+    if (encrypt != null) {
+      try {
+        logger.debug("Initializing cluster key");
+        encrypt.initClusterSecretKey();
+      } catch (Exception e) {
+        throw new RuntimeException("unable to create cluster key ", e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
index bff592b..4574292 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
@@ -2751,6 +2751,24 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
     }
   }
   
+  public static final int readDSFIDHeader(final DataInput in)
+      throws IOException, ClassNotFoundException
+    {
+      checkIn(in);
+      byte header = in.readByte();
+      if (header == DS_FIXED_ID_BYTE) {
+        return in.readByte();
+      } else if (header == DS_FIXED_ID_SHORT) {
+        return in.readShort();
+      } else if (header == DS_NO_FIXED_ID) {
+        return Integer.MAX_VALUE;//is that correct??
+      } else if (header == DS_FIXED_ID_INT) {
+        return in.readInt();
+      } else {
+        throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
+      }
+    }
+  
   /**
    * Reads an instance of <code>String</code> from a
    * <code>DataInput</code> given the header byte already being read.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
index 011b8f5..582149c 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
@@ -256,6 +256,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
     p.put(DistributionConfig.MCAST_TTL_NAME, mcastttl);
     p.put(DistributionConfig.LOCATORS_NAME, "localhost[" + locatorPort +"]");
     p.put(DistributionConfig.LOG_LEVEL_NAME, "info");
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
     return p;
   } 
   
@@ -288,6 +289,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
         locatorProps.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastport);
         locatorProps.setProperty(DistributionConfig.MCAST_TTL_NAME, mcastttl);
         locatorProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, "info");
+        locatorProps.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
         //locatorProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
         try {
           final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort, null, null,


[3/3] incubator-geode git commit: Merge branch 'feature/GEODE-1372' of https://git-wip-us.apache.org/repos/asf/incubator-geode into GEODE-1372

Posted by hi...@apache.org.
Merge branch 'feature/GEODE-1372' of https://git-wip-us.apache.org/repos/asf/incubator-geode into GEODE-1372


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/667c4259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/667c4259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/667c4259

Branch: refs/heads/feature/GEODE-1372
Commit: 667c4259f4dc399766b37b03b999e70fe10761b8
Parents: b6a7344 d6fb783
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Wed Jun 1 15:28:18 2016 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Wed Jun 1 15:28:18 2016 -0700

----------------------------------------------------------------------
 .../membership/gms/messenger/GMSEncryptJUnitTest.java        | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/667c4259/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
----------------------------------------------------------------------