You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/05/19 19:27:36 UTC

[31/36] incubator-geode git commit: GEODE-1372 Geode UDP communications are not secure when SSL is configured

GEODE-1372 Geode UDP communications are not secure when SSL is configured

This branch contains Diffe Hellman encoding of UDP communications in Geode
using the encryption scheme that is already available for client/server
communications.  The current implementation uses security-client-dhalgo
to enable encryption.

Membership views hold the public keys of peers.  GMSEncrypt is a new
object that is held by JGroupsMessenger and is used to perform the
encryption/decryption.

GMSJoinLeave is modified to send a new member's public key to the
membership coordinator.  The coordinator sends its public key back prior
to announcing the new membership view with the new member.  This should
be changed to have the coordinator's public key be sent to the joining
member and the coordinator should get the new member's public key from
a locator as well.

GMSEncrypt needs to be changed to record time spent encrypting and
decrypting in DistributionStats as well as the number of encryptions/decryptions
performed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a28b223a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a28b223a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a28b223a

Branch: refs/heads/feature/GEODE-1372
Commit: a28b223a60c9ede73776cca245c9f650b4db66c1
Parents: 89b91cb
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon May 9 15:59:33 2016 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Thu May 19 10:18:23 2016 -0700

----------------------------------------------------------------------
 .../internal/membership/NetView.java            |  33 +-
 .../membership/gms/interfaces/Messenger.java    |   8 +
 .../membership/gms/membership/GMSJoinLeave.java |  30 +-
 .../gms/messages/JoinRequestMessage.java        |  15 +-
 .../membership/gms/messenger/GMSEncrypt.java    | 333 +++++++++++++++++++
 .../gms/messenger/JGroupsMessenger.java         | 160 ++++++---
 .../membership/MembershipJUnitTest.java         | 138 ++++++++
 .../gms/messenger/GMSEncryptJUnitTest.java      |  97 ++++++
 .../messenger/JGroupsMessengerJUnitTest.java    |   6 +-
 9 files changed, 748 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
old mode 100644
new mode 100755
index af4aec3..365a193
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -19,15 +19,8 @@ package com.gemstone.gemfire.distributed.internal.membership;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.gemstone.gemfire.internal.logging.LogService;
 import org.apache.logging.log4j.Logger;
@@ -50,6 +43,8 @@ public class NetView implements DataSerializableFixedID {
 
   private int viewId;
   private List<InternalDistributedMember> members;
+  // TODO this should be a List
+  private Map<InternalDistributedMember, Object> publicKeys = new ConcurrentHashMap<>();
   private int[] failureDetectionPorts = new int[10];
   private Set<InternalDistributedMember> shutdownMembers;
   private Set<InternalDistributedMember> crashedMembers;
@@ -114,6 +109,7 @@ public class NetView implements DataSerializableFixedID {
     this.hashedMembers = new HashSet<InternalDistributedMember>(other.members);
     this.failureDetectionPorts = new int[other.failureDetectionPorts.length];
     System.arraycopy(other.failureDetectionPorts, 0, this.failureDetectionPorts, 0, other.failureDetectionPorts.length);
+    this.publicKeys = new HashMap<>(other.publicKeys);
     this.shutdownMembers = new HashSet<InternalDistributedMember>(other.shutdownMembers);
     this.crashedMembers = new HashSet<InternalDistributedMember>(other.crashedMembers);
   }
@@ -142,6 +138,19 @@ public class NetView implements DataSerializableFixedID {
     this.creator = creator;
   }
   
+  public Object getPublicKey(InternalDistributedMember mbr) {
+    return publicKeys.get(mbr);
+  }
+
+  public void setPublicKey(InternalDistributedMember mbr, Object key) {
+    publicKeys.put(mbr, key);
+  }
+
+  public void setPublicKeys(NetView otherView) {
+    this.publicKeys.putAll(otherView.publicKeys);
+  }
+
+
   public int[] getFailureDetectionPorts() {
     return this.failureDetectionPorts;
   }
@@ -153,7 +162,8 @@ public class NetView implements DataSerializableFixedID {
     }
     return failureDetectionPorts[idx];
   }
-  
+
+
   public void setFailureDetectionPort(InternalDistributedMember mbr, int port) {
     int idx = members.indexOf(mbr);
     if (idx < 0) {
@@ -575,6 +585,8 @@ public class NetView implements DataSerializableFixedID {
     InternalDataSerializer.writeSet(shutdownMembers, out);
     InternalDataSerializer.writeSet(crashedMembers, out);
     DataSerializer.writeIntArray(failureDetectionPorts, out);
+    // TODO expensive serialization
+    DataSerializer.writeObject(publicKeys, out);
   }
 
   @Override
@@ -586,6 +598,7 @@ public class NetView implements DataSerializableFixedID {
     shutdownMembers = InternalDataSerializer.readHashSet(in);
     crashedMembers = InternalDataSerializer.readHashSet(in);
     failureDetectionPorts = DataSerializer.readIntArray(in);
+    publicKeys = DataSerializer.readObject(in);
   }
 
   /** this will deserialize as an ArrayList */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index 5bb6c4b..e10f325 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -21,6 +21,7 @@ import java.util.Set;
 
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
 import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
 
 public interface Messenger extends Service {
@@ -30,6 +31,13 @@ public interface Messenger extends Service {
   void addHandler(Class c, MessageHandler h);
 
   /**
+   * sends an asynchronous message when the membership view may not have
+   * been established.  Returns destinations that did not
+   * receive the message due to no longer being in the view
+   */
+  Set<InternalDistributedMember> send(DistributionMessage m, NetView alternateView);
+
+  /**
    * sends an asynchronous message.  Returns destinations that did not
    * receive the message due to no longer being in the view
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index e12740e..d70884a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -362,7 +362,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
       int port = services.getHealthMonitor().getFailureDetectionPort();
       JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port);
-      services.getMessenger().send(req);
+      services.getMessenger().send(req, state.view);
     }
 
     JoinResponseMessage response = null;
@@ -770,6 +770,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
+  private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
+    for (InternalDistributedMember mbr : newMbrs) {
+      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+      services.getMessenger().send(response);
+    }
+  }
+
+
   boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
     return sendView(view, newMembers, true, this.prepareProcessor);
   }
@@ -833,7 +841,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     pendingRemovals.removeAll(view.getCrashedMembers());
     viewReplyProcessor.initialize(id, responders);
     viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
-    services.getMessenger().send(msg);
+    services.getMessenger().send(msg, view);
 
     // only wait for responses during preparation
     if (preparing) {
@@ -892,22 +900,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
       } else {
         this.preparedView = view;
-        ackView(m);
         if (viewContainsMyUnjoinedAddress) {
           installView(view); // this will notifyAll the joinResponse
         }
+        ackView(m);
       }
     } else { // !preparing
       if (isJoined && currentView != null && !view.contains(this.localAddress)) {
         logger.fatal("This member is no longer in the membership view.  My ID is {} and the new view is {}", localAddress, view);
         forceDisconnect("This node is no longer in the membership view");
       } else {
-        if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
-          ackView(m);
-        }
         if (isJoined || viewContainsMyUnjoinedAddress) {
           installView(view);
         }
+        if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
+          ackView(m);
+        }
       }
     }
   }
@@ -919,7 +927,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void ackView(InstallViewMessage m) {
     if (!playingDead && m.getView().contains(m.getView().getCreator())) {
-      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
+      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()), m.getView());
     }
   }
 
@@ -1901,6 +1909,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         for (InternalDistributedMember newMember : newMembers) {
           newView.add(newMember);
           newView.setFailureDetectionPort(newMember, v.getFailureDetectionPort(newMember));
+          newView.setPublicKey(newMember, v.getPublicKey(newMember));
         }
 
         // use the new view as the initial view
@@ -2024,6 +2033,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     void createAndSendView(List<DistributionMessage> requests) throws InterruptedException {
       List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
       Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
+      Map<InternalDistributedMember, Object> joinKeys = new HashMap<>(10);
       Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
       List<InternalDistributedMember> removalReqs = new ArrayList<>(10);
       List<String> removalReasons = new ArrayList<String>(10);
@@ -2057,6 +2067,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           if (!joinReqs.contains(mbr)) {
             joinReqs.add(mbr);
             joinPorts.put(mbr, port);
+            joinKeys.put(mbr, jmsg.getPublicKey());
           }
           break;
         case LEAVE_REQUEST_MESSAGE:
@@ -2126,10 +2137,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         for (InternalDistributedMember mbr : joinReqs) {
           if (mbrs.contains(mbr)) {
             newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
+            newView.setPublicKey(mbr, joinKeys.get(mbr));
           }
         }
         if (currentView != null) {
           newView.setFailureDetectionPorts(currentView);
+          newView.setPublicKeys(currentView);
         }
       }
 
@@ -2147,6 +2160,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       if (isShutdown()) {
         return;
       }
+
+      sendJoinResponses(joinReqs, newView);
+
       // send removal messages before installing the view so we stop
       // getting messages from members that have been kicked out
       sendRemoveMessages(removalReqs, removalReasons, newView, oldIDs);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 0e84e92..5545935 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,20 +30,25 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   private InternalDistributedMember memberID;
   private Object credentials;
   private int failureDetectionPort = -1;
+  private Object publicKey;
   
   public JoinRequestMessage(InternalDistributedMember coord,
-      InternalDistributedMember id, Object credentials, int fdPort) {
+                            InternalDistributedMember id, Object credentials, int fdPort) {
     super();
     setRecipient(coord);
     this.memberID = id;
     this.credentials = credentials;
+    this.publicKey = null;
     this.failureDetectionPort = fdPort;
   }
-  
   public JoinRequestMessage() {
     // no-arg constructor for serialization
   }
 
+  public void setPublicKey(Object key) {
+    this.publicKey = key;
+  }
+
   @Override
   public int getDSFID() {
     return JOIN_REQUEST;
@@ -61,6 +66,10 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   public Object getCredentials() {
     return credentials;
   }
+
+  public Object getPublicKey() {
+    return publicKey;
+  }
   
   @Override
   public String toString() {
@@ -76,6 +85,7 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   public void toData(DataOutput out) throws IOException {
     DataSerializer.writeObject(memberID, out);
     DataSerializer.writeObject(credentials, out);
+    DataSerializer.writeObject(publicKey, out);
     DataSerializer.writePrimitiveInt(failureDetectionPort, out);
     // preserve the multicast setting so the receiver can tell
     // if this is a mcast join request
@@ -86,6 +96,7 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     memberID = DataSerializer.readObject(in);
     credentials = DataSerializer.readObject(in);
+    publicKey = DataSerializer.readObject(in);
     failureDetectionPort = DataSerializer.readPrimitiveInt(in);
     setMulticast(in.readBoolean());
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
new file mode 100755
index 0000000..3f61453
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+
+import java.math.BigInteger;
+import java.security.*;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyAgreement;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.DHParameterSpec;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class GMSEncrypt {
+  public static long encodingsPerformed;
+  public static long decodingsPerformed;
+
+  private static final Logger logger = LogService.getLogger();
+
+  // Parameters for the Diffie-Hellman key exchange
+  private static final BigInteger dhP = new BigInteger(
+    "13528702063991073999718992897071702177131142188276542919088770094024269"
+      +  "73079899070080419278066109785292538223079165925365098181867673946"
+      +  "34756714063947534092593553024224277712367371302394452615862654308"
+      +  "11180902979719649450105660478776364198726078338308557022096810447"
+      +  "3500348898008043285865193451061481841186553");
+
+  private static final BigInteger dhG = new BigInteger(
+    "13058345680719715096166513407513969537624553636623932169016704425008150"
+      +  "56576152779768716554354314319087014857769741104157332735258102835"
+      +  "93126577393912282416840649805564834470583437473176415335737232689"
+      +  "81480201869671811010996732593655666464627559582258861254878896534"
+      +  "1273697569202082715873518528062345259949959");
+
+  private static final int dhL = 1023;
+
+  private  PrivateKey dhPrivateKey = null;
+
+  private  PublicKey dhPublicKey = null;
+
+  private  String dhSKAlgo = null;
+
+  private Services services;
+
+  private NetView view;
+
+  private Map<InternalDistributedMember, PeerEncryptor> memberToPeerEncryptor = new ConcurrentHashMap<>();
+
+
+  protected void installView(NetView view) {
+    this.view = view;
+    this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes());
+    // TODO remove ciphers for departed members
+  }
+
+
+
+  public GMSEncrypt(Services services) throws  Exception {
+    this.services = services;
+    initDHKeys(services.getConfig().getDistributionConfig());
+  }
+
+  public byte[] decryptData(byte[] data, InternalDistributedMember member) throws Exception {
+    return getPeerEncryptor(member).decryptBytes(data);
+  }
+
+  public byte[] encryptData(byte[] data, InternalDistributedMember member) throws Exception {
+    return getPeerEncryptor(member).encryptBytes(data);
+  }
+
+  protected byte[] getPublicKeyBytes() {
+    return dhPublicKey.getEncoded();
+  }
+
+
+  /**
+   * Initialize the Diffie-Hellman keys. This method is not thread safe
+   */
+  private void initDHKeys(DistributionConfig config) throws Exception {
+
+    dhSKAlgo = config.getSecurityClientDHAlgo();
+    // Initialize the keys when either the host is a peer that has
+    // non-blank setting for DH symmetric algo, or this is a server
+    // that has authenticator defined.
+    if ((dhSKAlgo != null && dhSKAlgo.length() > 0)) {
+      KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DH");
+      DHParameterSpec dhSpec = new DHParameterSpec(dhP, dhG, dhL);
+      keyGen.initialize(dhSpec);
+      KeyPair keypair = keyGen.generateKeyPair();
+
+      // Get the generated public and private keys
+      dhPrivateKey = keypair.getPrivate();
+      dhPublicKey = keypair.getPublic();
+    }
+  }
+
+  protected synchronized PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception{
+    PeerEncryptor result = memberToPeerEncryptor.get(member);
+    if (result == null) {
+      result = createPeerEncryptor(member);
+    }
+    return result;
+  }
+
+  private PeerEncryptor createPeerEncryptor(InternalDistributedMember member) throws Exception {
+    byte[] peerKeyBytes = (byte[]) view.getPublicKey(member);
+    PeerEncryptor result = new PeerEncryptor(peerKeyBytes);
+    memberToPeerEncryptor.put(member, result);
+    return result;
+  }
+
+
+  private static int getKeySize(String skAlgo) {
+    // skAlgo contain both algo and key size info
+    int colIdx = skAlgo.indexOf(':');
+    String algoStr;
+    int algoKeySize = 0;
+    if (colIdx >= 0) {
+      algoStr = skAlgo.substring(0, colIdx);
+      algoKeySize = Integer.parseInt(skAlgo.substring(colIdx + 1));
+    }
+    else {
+      algoStr = skAlgo;
+    }
+    int keysize = -1;
+    if (algoStr.equalsIgnoreCase("DESede")) {
+      keysize = 24;
+    }
+    else if (algoStr.equalsIgnoreCase("Blowfish")) {
+      keysize = algoKeySize > 128 ? algoKeySize / 8 : 16;
+    }
+    else if (algoStr.equalsIgnoreCase("AES")) {
+      keysize = (algoKeySize != 192 && algoKeySize != 256) ? 16
+        : algoKeySize / 8;
+    }
+    return keysize;
+  }
+
+  private static String getDhAlgoStr(String skAlgo) {
+    int colIdx = skAlgo.indexOf(':');
+    String algoStr;
+    if (colIdx >= 0) {
+      algoStr = skAlgo.substring(0, colIdx);
+    }
+    else {
+      algoStr = skAlgo;
+    }
+    return algoStr;
+  }
+
+  private static int getBlockSize(String skAlgo) {
+    int blocksize = -1;
+    String algoStr = getDhAlgoStr(skAlgo);
+    if (algoStr.equalsIgnoreCase("DESede")) {
+      blocksize = 8;
+    }
+    else if (algoStr.equalsIgnoreCase("Blowfish")) {
+      blocksize = 8;
+    }
+    else if (algoStr.equalsIgnoreCase("AES")) {
+      blocksize = 16;
+    }
+    return blocksize;
+  }
+
+  static public byte[] encryptBytes(byte[] data, Cipher encrypt) throws Exception{
+    synchronized(GMSEncrypt.class) {
+      encodingsPerformed++;
+    }
+    return encrypt.doFinal(data);
+  }
+
+  static public byte[] decryptBytes(byte[] data, Cipher decrypt)
+    throws Exception{
+    try {
+      byte[] decryptBytes = decrypt.doFinal(data);
+      synchronized(GMSEncrypt.class) {
+        decodingsPerformed++;
+      }
+      return decryptBytes;
+    }catch(Exception ex) {
+      throw ex;
+    }
+  }
+
+
+  protected class PeerEncryptor {
+
+    private PublicKey peerPublicKey = null;
+
+    private String peerSKAlgo = null;
+
+    private Cipher _encrypt;
+
+    protected PeerEncryptor(byte[] peerPublicKeyBytes) throws NoSuchAlgorithmException, InvalidKeySpecException {
+      X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(peerPublicKeyBytes);
+      KeyFactory keyFact = KeyFactory.getInstance("DH");
+      //PublicKey pubKey = keyFact.generatePublic(x509KeySpec);
+      this.peerPublicKey = keyFact.generatePublic(x509KeySpec);
+    }
+
+    public byte [] encryptBytes(byte[] data) throws Exception {
+      String algo = null;
+      if (this.peerSKAlgo != null) {
+        algo = this.peerSKAlgo;
+      } else {
+        algo = dhSKAlgo;
+      }
+      return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo, this.peerPublicKey));
+    }
+
+    private Cipher getEncryptCipher(String dhSKAlgo, PublicKey publicKey)
+      throws Exception{
+      try {
+        if(_encrypt == null) {
+          KeyAgreement ka = KeyAgreement.getInstance("DH");
+          ka.init(dhPrivateKey);
+          ka.doPhase(publicKey, true);
+
+          Cipher encrypt;
+
+          int keysize = getKeySize(dhSKAlgo);
+          int blocksize = getBlockSize(dhSKAlgo);
+
+          if (keysize == -1 || blocksize == -1) {
+            SecretKey sKey = ka.generateSecret(dhSKAlgo);
+            encrypt = Cipher.getInstance(dhSKAlgo);
+            encrypt.init(Cipher.ENCRYPT_MODE, sKey);
+          }
+          else {
+            String dhAlgoStr = getDhAlgoStr(dhSKAlgo);
+
+            byte[] sKeyBytes = ka.generateSecret();
+            SecretKeySpec sks = new SecretKeySpec(sKeyBytes, 0, keysize, dhAlgoStr);
+            IvParameterSpec ivps = new IvParameterSpec(sKeyBytes, keysize, blocksize);
+
+            encrypt = Cipher.getInstance(dhAlgoStr + "/CBC/PKCS5Padding");
+            encrypt.init(Cipher.ENCRYPT_MODE, sks, ivps);
+          }
+          _encrypt = encrypt;
+        }
+      }catch(Exception ex) {
+        throw ex;
+      }
+      return _encrypt;
+    }
+
+
+    public byte[] decryptBytes(byte[] data) throws Exception
+    {
+      String algo = null;
+      if (this.peerSKAlgo != null) {
+        algo = this.peerSKAlgo;
+      } else {
+        algo = dhSKAlgo;
+      }
+      Cipher c = getDecryptCipher(algo, this.peerPublicKey);
+      return GMSEncrypt.decryptBytes(data, c);
+
+    }
+
+
+
+    private Cipher _decrypt = null;
+
+    private Cipher getDecryptCipher( String dhSKAlgo, PublicKey publicKey)
+      throws Exception{
+      if(_decrypt == null) {
+        try {
+          KeyAgreement ka = KeyAgreement.getInstance("DH");
+          ka.init(dhPrivateKey);
+          ka.doPhase(publicKey, true);
+
+          Cipher decrypt;
+
+          int keysize = getKeySize(dhSKAlgo);
+          int blocksize = getBlockSize(dhSKAlgo);
+
+          if (keysize == -1 || blocksize == -1) {
+            SecretKey sKey = ka.generateSecret(dhSKAlgo);
+            decrypt = Cipher.getInstance(dhSKAlgo);
+            decrypt.init(Cipher.DECRYPT_MODE, sKey);
+          }
+          else {
+            String algoStr = getDhAlgoStr(dhSKAlgo);
+
+            byte[] sKeyBytes = ka.generateSecret();
+            SecretKeySpec sks = new SecretKeySpec(sKeyBytes, 0, keysize, algoStr);
+            IvParameterSpec ivps = new IvParameterSpec(sKeyBytes, keysize, blocksize);
+
+            decrypt = Cipher.getInstance(algoStr + "/CBC/PKCS5Padding");
+            decrypt.init(Cipher.DECRYPT_MODE, sks, ivps);
+          }
+
+          _decrypt = decrypt;
+        }catch(Exception ex) {
+          throw ex;
+        }
+      }
+      return _decrypt;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 0460964..25a9c43 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -149,6 +149,8 @@ public class JGroupsMessenger implements Messenger {
     ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
   }
 
+  private GMSEncrypt encrypt;
+
   @Override
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   public void init(Services s) {
@@ -250,7 +252,15 @@ public class JGroupsMessenger implements Messenger {
     properties = replaceStrings(properties, "FC_MAX_BLOCK", ""+dc.getMcastFlowControl().getRechargeBlockMs());
 
     this.jgStackConfig = properties;
-    
+
+    if ( !dc.getSecurityClientDHAlgo().isEmpty() ) {
+      try {
+        this.encrypt = new GMSEncrypt(services);
+      } catch (Exception e) {
+        throw new GemFireConfigException("problem initializing encryption protocol", e);
+      }
+    }
+
   }
 
   @Override
@@ -377,6 +387,10 @@ public class JGroupsMessenger implements Messenger {
     this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
 
     addressesWithIoExceptionsProcessed.clear();
+
+    if (encrypt != null) {
+//      encrypt.installView(v);
+    }
   }
   
 
@@ -588,7 +602,15 @@ public class JGroupsMessenger implements Messenger {
   public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
     return send(msg, false);
   }
-    
+
+  @Override
+  public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
+    if (this.encrypt != null) {
+     // this.encrypt.installView(alternateView);
+    }
+    return send(msg, true);
+  }
+
   @Override
   public Set<InternalDistributedMember> send(DistributionMessage msg) {
     return send(msg, true);
@@ -638,7 +660,7 @@ public class JGroupsMessenger implements Messenger {
     if (useMcast) {
 
       long startSer = theStats.startMsgSerialization();
-      Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
+      Message jmsg = createJGMessage(msg, local, null, Version.CURRENT_ORDINAL);
       theStats.endMsgSerialization(startSer);
 
       Exception problem = null;
@@ -680,7 +702,7 @@ public class JGroupsMessenger implements Messenger {
     } // useMcast
     else { // ! useMcast
       int len = destinations.length;
-      List<GMSMember> calculatedMembers; // explicit list of members
+      List<InternalDistributedMember> calculatedMembers; // explicit list of members
       int calculatedLen; // == calculatedMembers.len
       if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
         // Grab a copy of the current membership
@@ -688,51 +710,40 @@ public class JGroupsMessenger implements Messenger {
 
         // Construct the list
         calculatedLen = v.size();
-        calculatedMembers = new LinkedList<GMSMember>();
+        calculatedMembers = new LinkedList<InternalDistributedMember>();
         for (int i = 0; i < calculatedLen; i ++) {
           InternalDistributedMember m = (InternalDistributedMember)v.get(i);
-          calculatedMembers.add((GMSMember)m.getNetMember());
+          calculatedMembers.add(m);
         }
       } // send to all
       else { // send to explicit list
         calculatedLen = len;
-        calculatedMembers = new LinkedList<GMSMember>();
+        calculatedMembers = new LinkedList<>();
         for (int i = 0; i < calculatedLen; i ++) {
-          calculatedMembers.add((GMSMember)destinations[i].getNetMember());
+          calculatedMembers.add(destinations[i]);
         }
       } // send to explicit list
       Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
       long startSer = theStats.startMsgSerialization();
+
+      boolean encode = (encrypt != null);
+
       boolean firstMessage = true;
-      for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
-        GMSMember mbr = it.next();
-        short version = mbr.getVersionOrdinal();
-        if ( !messages.containsKey(version) ) {
-          Message jmsg = createJGMessage(msg, local, version);
-          messages.put(version, jmsg);
-          if (firstMessage) {
-            theStats.incSentBytes(jmsg.getLength());
-            firstMessage = false;
-          }
-        }
-      }
-      theStats.endMsgSerialization(startSer);
       Collections.shuffle(calculatedMembers);
       int i=0;
-      for (GMSMember mbr: calculatedMembers) {
+      for (InternalDistributedMember mbr: calculatedMembers) {
+        short version = mbr.getNetMember().getVersionOrdinal();
         JGAddress to = new JGAddress(mbr);
-        short version = mbr.getVersionOrdinal();
-        Message jmsg = (Message)messages.get(version);
+        Message jmsg = createJGMessage(msg, local, mbr, version);
         Exception problem = null;
         try {
-          Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
           if (!reliably) {
             jmsg.setFlag(Message.Flag.NO_RELIABILITY);
           }
-          tmp.setDest(to);
-          tmp.setSrc(this.jgAddress);
+          jmsg.setDest(to);
+          jmsg.setSrc(this.jgAddress);
           logger.trace("Unicasting to {}", to);
-          myChannel.send(tmp);
+          myChannel.send(jmsg);
         }
         catch (Exception e) {
           problem = e;
@@ -789,7 +800,7 @@ public class JGroupsMessenger implements Messenger {
    * @param version the version of the recipient
    * @return the new message
    */
-  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) {
+  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, InternalDistributedMember recipient, short version) {
     if(gfmsg instanceof DirectReplyMessage) {
       ((DirectReplyMessage) gfmsg).registerProcessor();
     }
@@ -803,7 +814,31 @@ public class JGroupsMessenger implements Messenger {
         new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
       Version.CURRENT.writeOrdinal(out_stream, true);
       DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
-      DataSerializer.writeObject(gfmsg, out_stream);
+      boolean encode = encrypt != null && recipient != null;
+      if (encode) {
+        // Coordinator doesn't know our publicKey for a JoinRequest
+        if (gfmsg.getDSFID() == JOIN_REQUEST || gfmsg.getDSFID() == JOIN_RESPONSE) {
+          encode = false;
+        }
+      }
+      if (encode) {
+        logger.info("encoding {}", gfmsg);
+        try {
+          out_stream.writeBoolean(true); // TODO we should have flag bits
+          HeapDataOutputStream out_stream2 =
+            new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+          DataSerializer.writeObject(gfmsg, out_stream2);
+          byte[] payload = out_stream2.toByteArray();
+          payload = encrypt.encryptData(payload, recipient);
+          DataSerializer.writeByteArray(payload, out_stream);
+        } catch (Exception e) {
+          throw new GemFireIOException("unable to send message", e);
+        }
+      } else {
+        logger.info("not encoding {}", gfmsg);
+        out_stream.writeBoolean(false);
+        DataSerializer.writeObject(gfmsg, out_stream);
+      }
       msg.setBuffer(out_stream.toByteArray());
       services.getStatistics().endMsgSerialization(start);
     }
@@ -869,7 +904,8 @@ public class JGroupsMessenger implements Messenger {
     try {
       long start = services.getStatistics().startMsgDeserialization();
       
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
+
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
           jgmsg.getOffset(), jgmsg.getLength()));
 
       short ordinal = Version.readOrdinal(dis);
@@ -878,9 +914,27 @@ public class JGroupsMessenger implements Messenger {
         dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
             ordinal, true));
       }
-      
+
       GMSMember m = DataSerializer.readObject(dis);
 
+      sender = getMemberFromView(m, ordinal);
+
+      boolean encrypted = dis.readBoolean();
+
+      if (encrypted && encrypt != null) {
+        byte[] payload = DataSerializer.readByteArray(dis);
+        try {
+          payload = encrypt.decryptData(payload, sender);
+          dis = new DataInputStream(new ByteArrayInputStream(payload));
+          if (ordinal < Version.CURRENT_ORDINAL) {
+            dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
+              ordinal, true));
+          }
+        } catch (Exception e) {
+          throw new GemFireIOException("unable to receive message", e);
+        }
+      }
+
       result = DataSerializer.readObject(dis);
 
       DistributionMessage dm = (DistributionMessage)result;
@@ -891,8 +945,6 @@ public class JGroupsMessenger implements Messenger {
       // request's sender ID
       if (dm.getDSFID() == JOIN_REQUEST) {
         sender = ((JoinRequestMessage)dm).getMemberID();
-      } else {
-        sender = getMemberFromView(m, ordinal);
       }
       ((DistributionMessage)result).setSender(sender);
       
@@ -914,25 +966,33 @@ public class JGroupsMessenger implements Messenger {
   /** look for certain messages that may need to be altered before being sent */
   void filterOutgoingMessage(DistributionMessage m) {
     switch (m.getDSFID()) {
-    case JOIN_RESPONSE:
-      JoinResponseMessage jrsp = (JoinResponseMessage)m;
-      
-      if (jrsp.getRejectionMessage() == null
+      case JOIN_REQUEST:
+        if (encrypt == null) {
+          break;
+        }
+        JoinRequestMessage joinMsg = (JoinRequestMessage)m;
+        joinMsg.setPublicKey(encrypt.getPublicKeyBytes());
+        break;
+
+      case JOIN_RESPONSE:
+        JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+        if (jrsp.getRejectionMessage() == null
           &&  services.getConfig().getTransport().isMcastEnabled()) {
-        // get the multicast message digest and pass it with the join response
-        Digest digest = (Digest)this.myChannel.getProtocolStack()
+          // get the multicast message digest and pass it with the join response
+          Digest digest = (Digest)this.myChannel.getProtocolStack()
             .getTopProtocol().down(Event.GET_DIGEST_EVT);
-        HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
-        try {
-          digest.writeTo(hdos);
-        } catch (Exception e) {
-          logger.fatal("Unable to serialize JGroups messaging digest", e);
+          HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
+          try {
+            digest.writeTo(hdos);
+          } catch (Exception e) {
+            logger.fatal("Unable to serialize JGroups messaging digest", e);
+          }
+          jrsp.setMessengerData(hdos.toByteArray());
         }
-        jrsp.setMessengerData(hdos.toByteArray());
-      }
-      break;
-    default:
-      break;
+        break;
+      default:
+        break;
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java
index 7a4971f..c050751 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.util.Properties;
 
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.GMSEncrypt;
 import org.apache.logging.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -217,6 +218,143 @@ public class MembershipJUnitTest {
     }
   }
   
+  /**
+   * This test ensures that secure communications are
+   * enabled.
+   *
+   * This test creates a locator with a colocated
+   * membership manager and then creates a second
+   * manager that joins the system of the first.
+   *
+   * It then makes assertions about the state of
+   * the membership view, closes one of the managers
+   * and makes more assertions.
+   */
+  @Test
+  public void testLocatorAndTwoServersJoinUsingDiffeHellman() throws Exception {
+
+    MembershipManager m1=null, m2=null;
+    Locator l = null;
+    int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
+
+    try {
+
+      // boot up a locator
+      int port = AvailablePortHelper.getRandomAvailableTCPPort();
+      InetAddress localHost = SocketCreator.getLocalHost();
+
+      // this locator will hook itself up with the first MembershipManager
+      // to be created
+      l = InternalLocator.startLocator(port, new File(""), null,
+        null, null, localHost, false, new Properties(), true, false, null,
+        false);
+
+      // create configuration objects
+      Properties nonDefault = new Properties();
+      nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true");
+      nonDefault.put(DistributionConfig.MCAST_PORT_NAME, String.valueOf(mcastPort));
+      nonDefault.put(DistributionConfig.LOG_FILE_NAME, "");
+      nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
+      nonDefault.put(DistributionConfig.GROUPS_NAME, "red, blue");
+      nonDefault.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "2000");
+      nonDefault.put(DistributionConfig.LOCATORS_NAME, localHost.getHostName()+'['+port+']');
+      nonDefault.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+      DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
+      RemoteTransportConfig transport = new RemoteTransportConfig(config,
+        DistributionManager.NORMAL_DM_TYPE);
+
+      // start the first membership manager
+      try {
+        System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+        DistributedMembershipListener listener1 = mock(DistributedMembershipListener.class);
+        DMStats stats1 = mock(DMStats.class);
+        System.out.println("creating 1st membership manager");
+        m1 = MemberFactory.newMembershipManager(listener1, config, transport, stats1);
+        m1.startEventProcessing();
+      } finally {
+        System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+      }
+
+      // start the second membership manager
+      DistributedMembershipListener listener2 = mock(DistributedMembershipListener.class);
+      DMStats stats2 = mock(DMStats.class);
+      System.out.println("creating 2nd membership manager");
+      m2 = MemberFactory.newMembershipManager(listener2, config, transport, stats2);
+      m2.startEventProcessing();
+
+      // we have to check the views with JoinLeave because the membership
+      // manager queues new views for processing through the DM listener,
+      // which is a mock object in this test
+      System.out.println("waiting for views to stabilize");
+      JoinLeave jl1 = ((GMSMembershipManager)m1).getServices().getJoinLeave();
+      JoinLeave jl2 = ((GMSMembershipManager)m2).getServices().getJoinLeave();
+      long giveUp = System.currentTimeMillis() + 15000;
+      for (;;) {
+        try {
+          assertTrue("view = " + jl2.getView(), jl2.getView().size() == 2);
+          assertTrue("view = " + jl1.getView(), jl1.getView().size() == 2);
+          assertTrue(jl1.getView().getCreator().equals(jl2.getView().getCreator()));
+          assertTrue(jl1.getView().getViewId() == jl2.getView().getViewId());
+          break;
+        } catch  (AssertionError e) {
+          if (System.currentTimeMillis() > giveUp) {
+            throw e;
+          }
+        }
+      }
+
+      System.out.println("testing multicast availability");
+      assertTrue(m1.testMulticast());
+
+      System.out.println("multicasting SerialAckedMessage from m1 to m2");
+      SerialAckedMessage msg = new SerialAckedMessage();
+      msg.setRecipient(m2.getLocalMember());
+      msg.setMulticast(true);
+      m1.send(new InternalDistributedMember[] {m2.getLocalMember()}, msg, null);
+      giveUp = System.currentTimeMillis() + 5000;
+      boolean verified = false;
+      Throwable problem = null;
+      while (giveUp > System.currentTimeMillis()) {
+        try {
+          verify(listener2).messageReceived(isA(SerialAckedMessage.class));
+          verified = true;
+          break;
+        } catch (Error e) {
+          problem = e;
+          Thread.sleep(500);
+        }
+      }
+      if (!verified) {
+        if (problem != null) {
+          problem.printStackTrace();
+        }
+        fail("Expected a multicast message to be received");
+      }
+
+      // let the managers idle for a while and get used to each other
+      Thread.sleep(4000l);
+
+      m2.shutdown();
+      assertTrue(!m2.isConnected());
+
+      assertTrue(m1.getView().size() == 1);
+
+      System.out.println("encodings performed: " + GMSEncrypt.encodingsPerformed + "; decodings performed: " + GMSEncrypt.decodingsPerformed);
+    }
+    finally {
+
+      if (m2 != null) {
+        m2.shutdown();
+      }
+      if (m1 != null) {
+        m1.shutdown();
+      }
+      if (l != null) {
+        l.stop();
+      }
+    }
+  }
+
   @Test
   public void testJoinTimeoutSetting() throws Exception {
     long timeout = 30000;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
new file mode 100755
index 0000000..a591e47
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
@@ -0,0 +1,97 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+import static org.mockito.Mockito.*;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by bschuchardt on 5/6/2016.
+ */
+public class GMSEncryptJUnitTest {
+
+  Services services;
+
+  InternalDistributedMember mockMembers[];
+
+  NetView netView;
+
+  private void initMocks() throws Exception {
+    Properties nonDefault = new Properties();
+    nonDefault.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+    DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
+    RemoteTransportConfig tconfig = new RemoteTransportConfig(config,
+      DistributionManager.NORMAL_DM_TYPE);
+
+    ServiceConfig serviceConfig = new ServiceConfig(tconfig, config);
+
+    services = mock(Services.class);
+    when(services.getConfig()).thenReturn(serviceConfig);
+
+    mockMembers = new InternalDistributedMember[4];
+    for (int i = 0; i < mockMembers.length; i++) {
+      mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
+    }
+    int viewId = 1;
+    List<InternalDistributedMember> mbrs = new LinkedList<>();
+    mbrs.add(mockMembers[0]);
+    mbrs.add(mockMembers[1]);
+    mbrs.add(mockMembers[2]);
+
+    //prepare the view
+    netView = new NetView(mockMembers[0], viewId, mbrs);
+
+  }
+
+
+  @Test
+  public void testOneMemberCanDecryptAnothersMessage() throws Exception{
+    initMocks();
+
+    GMSEncrypt gmsEncrypt1 = new GMSEncrypt(services); // this will be the sender
+    GMSEncrypt gmsEncrypt2 = new GMSEncrypt(services); // this will be the receiver
+
+    // establish the public keys for the sender and receiver
+    netView.setPublicKey(mockMembers[1], gmsEncrypt1.getPublicKeyBytes());
+    netView.setPublicKey(mockMembers[2], gmsEncrypt2.getPublicKeyBytes());
+
+    gmsEncrypt1.installView(netView);
+    gmsEncrypt2.installView(netView);
+
+    // sender encrypts a message, so use receiver's public key
+    String ch = "Hello world";
+    byte[] challenge =  ch.getBytes();
+    byte[]  encryptedChallenge =  gmsEncrypt1.encryptData(challenge, mockMembers[2]);
+
+    // receiver decrypts the message using the sender's public key
+    byte[] decryptBytes = gmsEncrypt2.decryptData(encryptedChallenge,  mockMembers[1]);
+
+    // now send a response
+    String response = "Hello yourself!";
+    byte[] responseBytes = response.getBytes();
+    byte[] encryptedResponse = gmsEncrypt2.encryptData(responseBytes, mockMembers[1]);
+
+    // receiver decodes the response
+    byte[] decryptedResponse = gmsEncrypt1.decryptData(encryptedResponse,  mockMembers[2]);
+
+    Assert.assertFalse(Arrays.equals(challenge, encryptedChallenge));
+
+    Assert.assertTrue(Arrays.equals(challenge, decryptBytes));
+
+    Assert.assertFalse(Arrays.equals(responseBytes, encryptedResponse));
+
+    Assert.assertTrue(Arrays.equals(responseBytes, decryptedResponse));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a28b223a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 2ad970b..60e790b 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -431,13 +431,13 @@ public class JGroupsMessengerJUnitTest {
     InternalDistributedMember sender = createAddress(8888);
     JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
     
-    Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
+    Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
     interceptor.up(new Event(Event.MSG, jmsg));
     
     verify(mh, times(1)).processMessage(any(JoinRequestMessage.class));
     
     LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing");
-    jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL);
+    jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
     interceptor.up(new Event(Event.MSG, jmsg));
     
     verify(manager).processMessage(any(LeaveRequestMessage.class));
@@ -805,7 +805,7 @@ public class JGroupsMessengerJUnitTest {
       dmsg.setRecipients(recipients);
   
       // a message is ignored during manager shutdown
-      msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
+      msg = messenger.createJGMessage(dmsg, new JGAddress(other), null, Version.CURRENT_ORDINAL);
       when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
       receiver.receive(msg);
       verify(manager, never()).processMessage(isA(DistributionMessage.class));