You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by wi...@apache.org on 2018/04/17 16:57:13 UTC
[geode] branch develop updated: GEODE-4986: refactoring GMS
encryption (#1715)
This is an automated email from the ASF dual-hosted git repository.
wirebaron pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d1a5ecf GEODE-4986: refactoring GMS encryption (#1715)
d1a5ecf is described below
commit d1a5ecf767961ff023cd6e1c5418e4e23abfd75c
Author: Brian Rowe <br...@pivotal.io>
AuthorDate: Tue Apr 17 09:57:08 2018 -0700
GEODE-4986: refactoring GMS encryption (#1715)
GEODE-4986: refactoring GMS encryption
---
.../distributed/internal/DistributionConfig.java | 1 -
.../membership/gms/messenger/GMSEncrypt.java | 428 ++++-----------------
.../gms/messenger/GMSEncryptionCipherPool.java | 92 +++++
.../membership/gms/messenger/JGroupsMessenger.java | 5 +-
.../gms/messenger/GMSEncryptJUnitTest.java | 54 ++-
.../gms/messenger/JGroupsMessengerJUnitTest.java | 22 +-
6 files changed, 215 insertions(+), 387 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index be5df89..dbbe3df 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -4945,5 +4945,4 @@ public interface DistributionConfig extends Config, LogConfig {
Collections.sort(atts);
return atts.toArray(new String[atts.size()]);
}
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index 4b33454..dabab16 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -20,7 +20,6 @@ import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.PublicKey;
-import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,13 +31,12 @@ import javax.crypto.spec.DHParameterSpec;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
-import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember.InternalDistributedMemberWrapper;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.Services;
-public final class GMSEncrypt implements Cloneable {
+public final class GMSEncrypt {
// Parameters for the Diffie-Hellman key exchange
private static final BigInteger dhP =
new BigInteger("13528702063991073999718992897071702177131142188276542919088770094024269"
@@ -56,119 +54,125 @@ public final class GMSEncrypt implements Cloneable {
private static final int dhL = 1023;
- private PrivateKey dhPrivateKey = null;
+ private final PrivateKey dhPrivateKey;
- private PublicKey dhPublicKey = null;
+ private final PublicKey dhPublicKey;
- private String dhSKAlgo = null;
+ private final String dhSKAlgo;
private Services services;
- private InternalDistributedMember localMember;
-
private NetView view;
- private static final int numberOfPeerEncryptorCopies = Integer.getInteger(
- "GMSEncrypt.MAX_ENCRYPTORS", Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
- /**
- * Keeps multiple copies for peer
- */
- private ConcurrentHashMap<InternalDistributedMember, PeerEncryptor>[] copyOfPeerEncryptors;
- /**
- * Keeps multiple copies of cluster keys
- */
- private ClusterEncryptor[] copyOfClusterEncryptors;
-
/**
* it keeps PK for peers
*/
- private Map<InternalDistributedMemberWrapper, byte[]> memberToPeerEncryptor =
+ private final Map<InternalDistributedMemberWrapper, byte[]> memberToPublicKey =
new ConcurrentHashMap<>();
- private ClusterEncryptor clusterEncryptor;
+ private final ConcurrentHashMap<InternalDistributedMember, GMSEncryptionCipherPool> peerEncryptors =
+ new ConcurrentHashMap<>();
+
+ private GMSEncryptionCipherPool clusterEncryptor;
protected void installView(NetView view) {
this.view = view;
this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes());
}
- protected void installView(NetView view, InternalDistributedMember mbr) {
+ void overrideInstallViewForTest(NetView view) {
this.view = view;
}
protected byte[] getClusterSecretKey() {
- return this.clusterEncryptor.secretBytes;
+ if (this.clusterEncryptor != null) {
+ return this.clusterEncryptor.getSecretBytes();
+ } else {
+ return null;
+ }
}
protected synchronized void initClusterSecretKey() throws Exception {
if (this.clusterEncryptor == null) {
- this.clusterEncryptor = new ClusterEncryptor(this);
+ this.clusterEncryptor = new GMSEncryptionCipherPool(this, generateSecret(dhPublicKey));
}
}
- protected synchronized void addClusterKey(byte[] secretBytes) {
- this.clusterEncryptor = new ClusterEncryptor(secretBytes);
+ protected synchronized void setClusterKey(byte[] secretBytes) {
+ this.clusterEncryptor = new GMSEncryptionCipherPool(this, secretBytes);
}
- private GMSEncrypt() {
- initEncryptors();
- }
-
- private byte[] getRegisteredPublicKey(InternalDistributedMember mbr) {
+ private byte[] getPublicKeyIfIAmLocator(InternalDistributedMember mbr) {
return services.getPublicKey(mbr);
}
- GMSEncrypt(Services services) throws Exception {
+ GMSEncrypt(Services services, String dhSKAlgo) throws Exception {
this.services = services;
- initEncryptors();
- initDHKeys(services.getConfig().getDistributionConfig());
- }
- GMSEncrypt(Services services, InternalDistributedMember mbr) throws Exception {
- this.services = services;
- this.localMember = mbr;
- initEncryptors();
- initDHKeys(services.getConfig().getDistributionConfig());
- }
+ this.dhSKAlgo = dhSKAlgo;
+
+ // Initialize the keys when either the host is a peer that has
+ // non-blank setting for DH symmetric algo, or this is a server
+ // that has authenticator defined.
+ if ((this.dhSKAlgo != null && this.dhSKAlgo.length() > 0)) {
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DH");
+ DHParameterSpec dhSpec = new DHParameterSpec(dhP, dhG, dhL);
+ keyGen.initialize(dhSpec);
+ KeyPair keypair = keyGen.generateKeyPair();
- private void initEncryptors() {
- copyOfPeerEncryptors = new ConcurrentHashMap[numberOfPeerEncryptorCopies];
- copyOfClusterEncryptors = new ClusterEncryptor[numberOfPeerEncryptorCopies];
+ // Get the generated public and private keys
+ dhPrivateKey = keypair.getPrivate();
+ dhPublicKey = keypair.getPublic();
+ } else {
+ dhPrivateKey = null;
+ dhPublicKey = null;
+ }
}
- public byte[] decryptData(byte[] data, InternalDistributedMember member) throws Exception {
+ byte[] decryptData(byte[] data, InternalDistributedMember member) throws Exception {
return getPeerEncryptor(member).decryptBytes(data);
}
- public byte[] encryptData(byte[] data, InternalDistributedMember member) throws Exception {
+ byte[] encryptData(byte[] data, InternalDistributedMember member) throws Exception {
return getPeerEncryptor(member).encryptBytes(data);
}
- public byte[] decryptData(byte[] data) throws Exception {
- return getClusterEncryptor().decryptBytes(data);
+ byte[] decryptData(byte[] data) throws Exception {
+ return clusterEncryptor.decryptBytes(data);
}
- public byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception {
- PeerEncryptor encryptor = new PeerEncryptor(pkBytes);
+ byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception {
+ GMSEncryptionCipherPool encryptor = new GMSEncryptionCipherPool(this, generateSecret(pkBytes));
return encryptor.decryptBytes(data);
}
- public byte[] encryptData(byte[] data) throws Exception {
- return getClusterEncryptor().encryptBytes(data);
+ byte[] encryptData(byte[] data) throws Exception {
+ return clusterEncryptor.encryptBytes(data);
}
- protected byte[] getPublicKeyBytes() {
+ byte[] getPublicKeyBytes() {
return dhPublicKey.getEncoded();
}
+ private byte[] lookupKeyByMember(InternalDistributedMember member) {
+ byte[] pk = memberToPublicKey.get(new InternalDistributedMemberWrapper(member));
+ if (pk == null) {
+ pk = getPublicKeyIfIAmLocator(member);
+ }
+ if (pk == null) {
+ pk = (byte[]) view.getPublicKey(member);
+ }
+ return pk;
+ }
+
protected byte[] getPublicKey(InternalDistributedMember member) {
try {
InternalDistributedMember localMbr = services.getMessenger().getMemberID();
if (localMbr != null && localMbr.equals(member)) {
return this.dhPublicKey.getEncoded();// local one
}
- return getPeerEncryptor(member).peerPublicKey.getEncoded();
+ return lookupKeyByMember(member);
} catch (Exception e) {
throw new RuntimeException("Not found public key for member " + member, e);
}
@@ -176,125 +180,24 @@ public final class GMSEncrypt implements Cloneable {
protected void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
try {
- memberToPeerEncryptor.put(new InternalDistributedMemberWrapper(mbr), publickey);
- synchronized (copyOfPeerEncryptors) {
- // remove all the existing keys..
- for (Map m : copyOfPeerEncryptors) {
- if (m != null)
- m.remove(mbr);
- }
- }
+ memberToPublicKey.put(new InternalDistributedMemberWrapper(mbr), publickey);
+ peerEncryptors.replace(mbr, new GMSEncryptionCipherPool(this, generateSecret(publickey)));
} catch (Exception e) {
throw new RuntimeException("Unable to create peer encryptor " + mbr, e);
}
}
- @Override
- protected GMSEncrypt clone() {
- try {
- GMSEncrypt gmsEncrypt = new GMSEncrypt();
- gmsEncrypt.localMember = this.localMember;
- gmsEncrypt.dhSKAlgo = this.dhSKAlgo;
- gmsEncrypt.services = this.services;
-
- X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(this.dhPublicKey.getEncoded());
- KeyFactory keyFact = KeyFactory.getInstance("DH");
- gmsEncrypt.dhPublicKey = keyFact.generatePublic(x509KeySpec);
- final String format = this.dhPrivateKey.getFormat();
- System.out.println("private key format " + format);
- System.out.println("public ksy format " + this.dhPublicKey.getFormat());
- PKCS8EncodedKeySpec x509KeySpecPKey = new PKCS8EncodedKeySpec(this.dhPrivateKey.getEncoded());
-
- keyFact = KeyFactory.getInstance("DH");
- gmsEncrypt.dhPrivateKey = keyFact.generatePrivate(x509KeySpecPKey);
-
- return gmsEncrypt;
- } catch (Exception e) {
- throw new RuntimeException("Unable to clone", e);
- }
- }
-
- /**
- * Initialize the Diffie-Hellman keys. This method is not thread safe
- */
- private void initDHKeys(DistributionConfig config) throws Exception {
-
- dhSKAlgo = config.getSecurityUDPDHAlgo();
- // Initialize the keys when either the host is a peer that has
- // non-blank setting for DH symmetric algo, or this is a server
- // that has authenticator defined.
- if ((dhSKAlgo != null && dhSKAlgo.length() > 0)) {
- KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DH");
- DHParameterSpec dhSpec = new DHParameterSpec(dhP, dhG, dhL);
- keyGen.initialize(dhSpec);
- KeyPair keypair = keyGen.generateKeyPair();
-
- // Get the generated public and private keys
- dhPrivateKey = keypair.getPrivate();
- dhPublicKey = keypair.getPublic();
- }
- }
-
- private PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
- Map<InternalDistributedMember, PeerEncryptor> m = getPeerEncryptorMap();
-
- PeerEncryptor result = m.get(member);
- if (result == null) {
- synchronized (this) {
- result = m.get(member);
- if (result == null) {
- byte[] pk = memberToPeerEncryptor.get(new InternalDistributedMemberWrapper(member));
- if (pk == null) {
- pk = getRegisteredPublicKey(member);
- }
- result =
- createPeerEncryptor(member, pk != null ? pk : (byte[]) view.getPublicKey(member));
- m.put(member, result);
- }
- }
- }
- return result;
- }
-
- private Map<InternalDistributedMember, PeerEncryptor> getPeerEncryptorMap() {
- int h = Math.abs(Thread.currentThread().getName().hashCode() % numberOfPeerEncryptorCopies);
- ConcurrentHashMap<InternalDistributedMember, PeerEncryptor> m = copyOfPeerEncryptors[h];
-
- if (m == null) {
- synchronized (copyOfPeerEncryptors) {
- m = copyOfPeerEncryptors[h];
- if (m == null) {
- m = new ConcurrentHashMap<>();
- copyOfPeerEncryptors[h] = m;
- }
- }
- }
- return m;
- }
-
- private ClusterEncryptor getClusterEncryptor() {
- int h = Math.abs(Thread.currentThread().getName().hashCode() % numberOfPeerEncryptorCopies);
- ClusterEncryptor c = copyOfClusterEncryptors[h];
-
- if (c == null) {
- synchronized (copyOfClusterEncryptors) {
- c = copyOfClusterEncryptors[h];
- if (c == null) {
- c = new ClusterEncryptor(getClusterSecretKey());
- copyOfClusterEncryptors[h] = c;
- }
- }
- }
- return c;
- }
-
- private PeerEncryptor createPeerEncryptor(InternalDistributedMember member, byte[] peerKeyBytes)
+ private GMSEncryptionCipherPool getPeerEncryptor(InternalDistributedMember member)
throws Exception {
- PeerEncryptor result = new PeerEncryptor(peerKeyBytes);
- return result;
+ return peerEncryptors.computeIfAbsent(member, (mbr) -> {
+ try {
+ return new GMSEncryptionCipherPool(this, generateSecret(lookupKeyByMember(member)));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ });
}
-
private static int getKeySize(String skAlgo) {
// skAlgo contain both algo and key size info
int colIdx = skAlgo.indexOf(':');
@@ -341,98 +244,7 @@ public final class GMSEncrypt implements Cloneable {
return blocksize;
}
- private static byte[] encryptBytes(byte[] data, Cipher encrypt) throws Exception {
- return encrypt.doFinal(data);
- }
-
- private static byte[] decryptBytes(byte[] data, Cipher decrypt) throws Exception {
- return decrypt.doFinal(data);
- }
-
- private class PeerEncryptor {
-
- private PublicKey peerPublicKey;
-
- private String peerSKAlgo = null;
-
- private Cipher encrypt;
-
- private Cipher decrypt = null;
-
- private PeerEncryptor(byte[] peerPublicKeyBytes) throws Exception {
- this.peerPublicKey = getPublicKey(peerPublicKeyBytes);
- }
-
- private synchronized byte[] encryptBytes(byte[] data) throws Exception {
- String algo;
- if (this.peerSKAlgo != null) {
- algo = this.peerSKAlgo;
- } else {
- algo = dhSKAlgo;
- }
- return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo));
- }
-
- private Cipher getEncryptCipher(String dhSKAlgo) throws Exception {
- if (encrypt == null) {
- encrypt = GMSEncrypt.getEncryptCipher(dhSKAlgo, dhPrivateKey, this.peerPublicKey);
- }
- return encrypt;
- }
-
- public synchronized byte[] decryptBytes(byte[] data) throws Exception {
- String algo = null;
- if (this.peerSKAlgo != null) {
- algo = this.peerSKAlgo;
- } else {
- algo = dhSKAlgo;
- }
- Cipher c = getDecryptCipher(algo, this.peerPublicKey);
- return GMSEncrypt.decryptBytes(data, c);
-
- }
-
- private Cipher getDecryptCipher(String dhSKAlgo, PublicKey publicKey) throws Exception {
- if (decrypt == null) {
- decrypt = GMSEncrypt.getDecryptCipher(dhSKAlgo, dhPrivateKey, this.peerPublicKey);
- }
- return decrypt;
- }
-
- }
-
- // this needs to synchronize as it uses private key of that member
- private static synchronized Cipher getEncryptCipher(String dhSKAlgo, PrivateKey privateKey,
- PublicKey peerPublicKey) throws Exception {
- KeyAgreement ka = KeyAgreement.getInstance("DH");
- ka.init(privateKey);
- ka.doPhase(peerPublicKey, true);
-
- Cipher encrypt;
-
- int keysize = getKeySize(dhSKAlgo);
- int blocksize = getBlockSize(dhSKAlgo);
-
- if (keysize == -1 || blocksize == -1) {
- SecretKey sKey = ka.generateSecret(dhSKAlgo);
- encrypt = Cipher.getInstance(dhSKAlgo);
- encrypt.init(Cipher.ENCRYPT_MODE, sKey);
- } else {
- String dhAlgoStr = getDhAlgoStr(dhSKAlgo);
-
- byte[] sKeyBytes = ka.generateSecret();
- SecretKeySpec sks = new SecretKeySpec(sKeyBytes, 0, keysize, dhAlgoStr);
- IvParameterSpec ivps = new IvParameterSpec(sKeyBytes, keysize, blocksize);
-
- encrypt = Cipher.getInstance(dhAlgoStr + "/CBC/PKCS5Padding");
- encrypt.init(Cipher.ENCRYPT_MODE, sks, ivps);
- }
-
- return encrypt;
- }
-
- private static Cipher getEncryptCipher(String dhSKAlgo, byte[] secretBytes) throws Exception {
-
+ Cipher getEncryptCipher(byte[] secretBytes) throws Exception {
Cipher encrypt = null;
int keysize = getKeySize(dhSKAlgo);
@@ -443,8 +255,8 @@ public final class GMSEncrypt implements Cloneable {
encrypt = Cipher.getInstance(dhSKAlgo);
encrypt.init(Cipher.ENCRYPT_MODE, sks);
} else {
-
String dhAlgoStr = getDhAlgoStr(dhSKAlgo);
+
SecretKeySpec sks = new SecretKeySpec(secretBytes, 0, keysize, dhAlgoStr);
IvParameterSpec ivps = new IvParameterSpec(secretBytes, keysize, blocksize);
@@ -455,36 +267,7 @@ public final class GMSEncrypt implements Cloneable {
return encrypt;
}
- // this needs to synchronize as it uses private key of that member
- private static synchronized Cipher getDecryptCipher(String dhSKAlgo, PrivateKey privateKey,
- PublicKey publicKey) throws Exception {
- KeyAgreement ka = KeyAgreement.getInstance("DH");
- ka.init(privateKey);
- ka.doPhase(publicKey, true);
-
- Cipher decrypt;
-
- int keysize = getKeySize(dhSKAlgo);
- int blocksize = getBlockSize(dhSKAlgo);
-
- if (keysize == -1 || blocksize == -1) {
- SecretKey sKey = ka.generateSecret(dhSKAlgo);
- decrypt = Cipher.getInstance(dhSKAlgo);
- decrypt.init(Cipher.DECRYPT_MODE, sKey);
- } else {
- String algoStr = getDhAlgoStr(dhSKAlgo);
-
- byte[] sKeyBytes = ka.generateSecret();
- SecretKeySpec sks = new SecretKeySpec(sKeyBytes, 0, keysize, algoStr);
- IvParameterSpec ivps = new IvParameterSpec(sKeyBytes, keysize, blocksize);
-
- decrypt = Cipher.getInstance(algoStr + "/CBC/PKCS5Padding");
- decrypt.init(Cipher.DECRYPT_MODE, sks, ivps);
- }
- return decrypt;
- }
-
- private static Cipher getDecryptCipher(String dhSKAlgo, byte[] secretBytes) throws Exception {
+ Cipher getDecryptCipher(byte[] secretBytes) throws Exception {
Cipher decrypt;
int keysize = getKeySize(dhSKAlgo);
@@ -506,6 +289,17 @@ public final class GMSEncrypt implements Cloneable {
return decrypt;
}
+ private byte[] generateSecret(byte[] peerKeyBytes) throws Exception {
+ X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(peerKeyBytes);
+ KeyFactory keyFact = KeyFactory.getInstance("DH");
+ PublicKey peerKey = keyFact.generatePublic(x509KeySpec);
+ return generateSecret(dhSKAlgo, dhPrivateKey, peerKey);
+ }
+
+ private byte[] generateSecret(PublicKey peerKey) throws Exception {
+ return generateSecret(dhSKAlgo, dhPrivateKey, peerKey);
+ }
+
private static byte[] generateSecret(String dhSKAlgo, PrivateKey privateKey,
PublicKey otherPublicKey) throws Exception {
KeyAgreement ka = KeyAgreement.getInstance("DH");
@@ -522,62 +316,4 @@ public final class GMSEncrypt implements Cloneable {
return ka.generateSecret();
}
}
-
- private static PublicKey getPublicKey(byte[] publicKeyBytes) throws Exception {
- X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(publicKeyBytes);
- KeyFactory keyFact = KeyFactory.getInstance("DH");
- return keyFact.generatePublic(x509KeySpec);
- }
-
- /***
- * this will hold the common key for cluster
- */
- private class ClusterEncryptor {
- byte[] secretBytes;
- Cipher encrypt;
- Cipher decrypt;
-
- private ClusterEncryptor(GMSEncrypt other) throws Exception {
- GMSEncrypt mine = new GMSEncrypt(other.services);
- this.secretBytes =
- GMSEncrypt.generateSecret(mine.dhSKAlgo, mine.dhPrivateKey, other.dhPublicKey);
- }
-
- private ClusterEncryptor(byte[] sb) {
- this.secretBytes = sb;
- }
-
- private synchronized byte[] encryptBytes(byte[] data) throws Exception {
- String algo = dhSKAlgo;
- return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo));
- }
-
- private Cipher getEncryptCipher(String dhSKAlgo) throws Exception {
- if (encrypt == null) {
- synchronized (this) {
- if (encrypt == null) {
- encrypt = GMSEncrypt.getEncryptCipher(dhSKAlgo, secretBytes);
- }
- }
- }
- return encrypt;
- }
-
- private synchronized byte[] decryptBytes(byte[] data) throws Exception {
- String algo = dhSKAlgo;
- Cipher c = getDecryptCipher(algo);
- return GMSEncrypt.decryptBytes(data, c);
- }
-
- private Cipher getDecryptCipher(String dhSKAlgo) throws Exception {
- if (decrypt == null) {
- synchronized (this) {
- if (decrypt == null) {
- decrypt = GMSEncrypt.getDecryptCipher(dhSKAlgo, secretBytes);
- }
- }
- }
- return decrypt;
- }
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptionCipherPool.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptionCipherPool.java
new file mode 100644
index 0000000..fb19074
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptionCipherPool.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms.messenger;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.Cipher;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.distributed.internal.membership.gms.Services;
+
+public class GMSEncryptionCipherPool {
+ private static final int MAX_CIPHERS_PER_POOL = Integer.getInteger("GMSEncrypt.MAX_ENCRYPTORS",
+ Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
+ private static final int MAX_CIPHER_WAIT_IN_SEC = 10;
+ private static final Logger logger = Services.getLogger();
+
+ private final GMSEncrypt gmsEncrypt;
+ private final byte[] secretBytes;
+ private final BlockingQueue<Cipher> encryptCipherQueue = new LinkedBlockingQueue<>();
+ private final AtomicInteger encryptCipherCount = new AtomicInteger(0);
+ private final BlockingQueue<Cipher> decryptCipherQueue = new LinkedBlockingQueue<>();
+ private final AtomicInteger decryptCipherCount = new AtomicInteger(0);
+
+ GMSEncryptionCipherPool(GMSEncrypt gmsEncrypt, byte[] secretBytes) {
+ this.gmsEncrypt = gmsEncrypt;
+ this.secretBytes = secretBytes;
+ }
+
+ byte[] getSecretBytes() {
+ return secretBytes;
+ }
+
+ interface ThrowingFunction<T, R> {
+ R apply(T in) throws Exception;
+ }
+
+ byte[] encryptBytes(byte[] data) throws Exception {
+ Cipher encrypt =
+ getOrCreateCipher(encryptCipherQueue, encryptCipherCount, gmsEncrypt::getEncryptCipher);
+ try {
+ return encrypt.doFinal(data);
+ } finally {
+ encryptCipherQueue.offer(encrypt);
+ }
+ }
+
+ byte[] decryptBytes(byte[] data) throws Exception {
+ Cipher decrypt =
+ getOrCreateCipher(decryptCipherQueue, decryptCipherCount, gmsEncrypt::getDecryptCipher);
+ try {
+ return decrypt.doFinal(data);
+ } finally {
+ decryptCipherQueue.offer(decrypt);
+ }
+ }
+
+ private Cipher getOrCreateCipher(BlockingQueue<Cipher> cipherQueue, AtomicInteger cipherCount,
+ ThrowingFunction<byte[], Cipher> maker) throws Exception {
+ Cipher cipher = cipherQueue.poll();
+ if (cipher == null) {
+ if (cipherCount.getAndIncrement() < MAX_CIPHERS_PER_POOL) {
+ cipher = maker.apply(secretBytes);
+ } else {
+ cipherCount.decrementAndGet();
+ cipher = cipherQueue.poll(MAX_CIPHER_WAIT_IN_SEC, TimeUnit.SECONDS);
+ }
+ }
+ if (cipher == null) {
+ logger.error("No encryption cipher available, exceeding max cipher threshold");
+ cipherCount.incrementAndGet();
+ cipher = maker.apply(secretBytes);
+ }
+ return cipher;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 83e15d7..83cbe71 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -278,7 +278,7 @@ public class JGroupsMessenger implements Messenger {
if (!dc.getSecurityUDPDHAlgo().isEmpty()) {
try {
- this.encrypt = new GMSEncrypt(services);
+ this.encrypt = new GMSEncrypt(services, dc.getSecurityUDPDHAlgo());
logger.info("Initializing GMSEncrypt ");
} catch (Exception e) {
throw new GemFireConfigException("problem initializing encryption protocol", e);
@@ -1052,7 +1052,6 @@ public class JGroupsMessenger implements Messenger {
byte[] pk = null;
if (readPK) {
- // need to read PK
pk = InternalDataSerializer.readByteArray(dis);
data = InternalDataSerializer.readByteArray(dis);
// using prefixed pk from sender
@@ -1361,7 +1360,7 @@ public class JGroupsMessenger implements Messenger {
public void setClusterSecretKey(byte[] clusterSecretKey) {
if (encrypt != null) {
logger.debug("Setting cluster key");
- encrypt.addClusterKey(clusterSecretKey);
+ encrypt.setClusterKey(clusterSecretKey);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
index eb4c913..db9fbdf 100755
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
@@ -44,6 +44,7 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class GMSEncryptJUnitTest {
private static final int THREAD_COUNT = 20;
+ private static final String DEFAULT_ALGO = "AES:128";
Services services;
@@ -56,7 +57,7 @@ public class GMSEncryptJUnitTest {
ExecutorServiceRule.builder().threadCount(THREAD_COUNT).build();
private void initMocks() throws Exception {
- initMocks("AES:128");
+ initMocks(DEFAULT_ALGO);
}
private void initMocks(String algo) throws Exception {
@@ -93,15 +94,15 @@ public class GMSEncryptJUnitTest {
for (String algo : algos) {
initMocks(algo);
- GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
- GMSEncrypt receiver = new GMSEncrypt(services, mockMembers[2]);
+ GMSEncrypt sender = new GMSEncrypt(services, algo);
+ GMSEncrypt receiver = new GMSEncrypt(services, algo);
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
netView.setPublicKey(mockMembers[2], receiver.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
- receiver.installView(netView, mockMembers[2]);
+ sender.overrideInstallViewForTest(netView);
+ receiver.overrideInstallViewForTest(netView);
// sender encrypts a message, so use receiver's public key
String ch = "Hello world";
@@ -134,15 +135,15 @@ public class GMSEncryptJUnitTest {
public void testOneMemberCanDecryptAnothersMessageMultithreaded() throws Exception {
initMocks();
final int runs = 100000;
- final GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
- final GMSEncrypt receiver = new GMSEncrypt(services, mockMembers[2]);
+ final GMSEncrypt sender = new GMSEncrypt(services, DEFAULT_ALGO);
+ final GMSEncrypt receiver = new GMSEncrypt(services, DEFAULT_ALGO);
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
netView.setPublicKey(mockMembers[2], receiver.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
- receiver.installView(netView, mockMembers[2]);
+ sender.overrideInstallViewForTest(netView);
+ receiver.overrideInstallViewForTest(netView);
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int j = 0; j < THREAD_COUNT; j++)
@@ -195,18 +196,15 @@ public class GMSEncryptJUnitTest {
public void testPublicKeyPrivateKeyFromSameMember() throws Exception {
initMocks();
- GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
- GMSEncrypt receiver = new GMSEncrypt(services, mockMembers[2]);
-
- sender = sender.clone();
- receiver = receiver.clone();
+ GMSEncrypt sender = new GMSEncrypt(services, DEFAULT_ALGO);
+ GMSEncrypt receiver = new GMSEncrypt(services, DEFAULT_ALGO);
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
netView.setPublicKey(mockMembers[2], receiver.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
- receiver.installView(netView, mockMembers[2]);
+ sender.overrideInstallViewForTest(netView);
+ receiver.overrideInstallViewForTest(netView);
// sender encrypts a message, so use receiver's public key
String ch = "Hello world";
@@ -238,12 +236,12 @@ public class GMSEncryptJUnitTest {
public void testForClusterSecretKey() throws Exception {
initMocks();
- GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
+ GMSEncrypt sender = new GMSEncrypt(services, DEFAULT_ALGO);
sender.initClusterSecretKey();
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
+ sender.overrideInstallViewForTest(netView);
// sender encrypts a message, so use receiver's public key
String ch = "Hello world";
@@ -263,20 +261,20 @@ public class GMSEncryptJUnitTest {
for (String algo : algos) {
initMocks(algo);
- final GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
+ final GMSEncrypt sender = new GMSEncrypt(services, algo);
sender.initClusterSecretKey();
- final GMSEncrypt receiver = new GMSEncrypt(services, mockMembers[2]);
+ final GMSEncrypt receiver = new GMSEncrypt(services, algo);
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
netView.setPublicKey(mockMembers[2], receiver.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
+ sender.overrideInstallViewForTest(netView);
byte[] secretBytes = sender.getClusterSecretKey();
- receiver.addClusterKey(secretBytes);
+ receiver.setClusterKey(secretBytes);
- receiver.installView(netView, mockMembers[1]);
+ receiver.overrideInstallViewForTest(netView);
// sender encrypts a message, so use receiver's public key
String ch = "Hello world";
@@ -308,21 +306,21 @@ public class GMSEncryptJUnitTest {
public void testForClusterSecretKeyFromOtherMemberMultipleThreads() throws Exception {
initMocks();
- final GMSEncrypt sender = new GMSEncrypt(services, mockMembers[1]);
+ final GMSEncrypt sender = new GMSEncrypt(services, DEFAULT_ALGO);
Thread.currentThread().sleep(100);
sender.initClusterSecretKey();
- final GMSEncrypt receiver = new GMSEncrypt(services, mockMembers[2]);
+ final GMSEncrypt receiver = new GMSEncrypt(services, DEFAULT_ALGO);
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], sender.getPublicKeyBytes());
netView.setPublicKey(mockMembers[2], receiver.getPublicKeyBytes());
- sender.installView(netView, mockMembers[1]);
+ sender.overrideInstallViewForTest(netView);
byte[] secretBytes = sender.getClusterSecretKey();
- receiver.addClusterKey(secretBytes);
+ receiver.setClusterKey(secretBytes);
- receiver.installView(netView, mockMembers[1]);
+ receiver.overrideInstallViewForTest(netView);
final int runs = 100000;
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 0fe835f..28b0588 100755
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -87,6 +87,8 @@ import org.apache.geode.test.junit.categories.MembershipTest;
@Category({IntegrationTest.class, MembershipTest.class})
public class JGroupsMessengerJUnitTest {
+ private static final String AES_128 = "AES:128";
+
private Services services;
private JGroupsMessenger messenger;
private JoinLeave joinLeave;
@@ -953,13 +955,14 @@ public class JGroupsMessengerJUnitTest {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
- p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
+ final String udpDhalgo = "AES:128";
+ p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, udpDhalgo);
initMocks(false, p);
NetView v = createView(otherMbr);
when(joinLeave.getMemberID(messenger.getMemberID().getNetMember()))
.thenReturn(messenger.getMemberID());
- GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, udpDhalgo);
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
@@ -992,12 +995,13 @@ public class JGroupsMessengerJUnitTest {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
- p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
+
+ p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
- GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
messenger.getMemberID());
@@ -1033,12 +1037,12 @@ public class JGroupsMessengerJUnitTest {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
- p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
+ p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
- GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
messenger.initClusterKey();
@@ -1067,12 +1071,12 @@ public class JGroupsMessengerJUnitTest {
InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
Properties p = new Properties();
- p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
+ p.put(ConfigurationProperties.SECURITY_UDP_DHALGO, AES_128);
initMocks(false, p);
NetView v = createView(otherMbr);
- GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services, AES_128);
otherMbrEncrptor.setPublicKey(messenger.getPublicKey(messenger.getMemberID()),
messenger.getMemberID());
@@ -1109,7 +1113,7 @@ public class JGroupsMessengerJUnitTest {
requestBytes = out.toByteArray();
- otherMbrEncrptor.addClusterKey(((JoinResponseMessage) gfMessageAtOtherMbr).getSecretPk());
+ otherMbrEncrptor.setClusterKey(((JoinResponseMessage) gfMessageAtOtherMbr).getSecretPk());
dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
--
To stop receiving notification emails like this one, please contact
wirebaron@apache.org.