You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/06/01 22:29:23 UTC
[1/3] incubator-geode git commit: GEODE-1372 added unit test and some
more fixes.
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1372 d6fb78341 -> 667c4259f
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
index 3a06c1c..701cc07 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
@@ -109,6 +109,11 @@ public class LocatorDUnitTest extends DistributedTestCase {
system = null;
}
}
+
+ //for child classes
+ protected void addDSProps(Properties p) {
+
+ }
//////// Test Methods
@@ -120,7 +125,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* it hung with the restarted locator trying to become elder again because
* it put its address at the beginning of the new view it sent out.
*/
- public void testCollocatedLocatorWithSecurity() throws Exception {
+ public void ntestCollocatedLocatorWithSecurity() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
@@ -139,6 +144,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("security-peer-authenticator", "com.gemstone.gemfire.distributed.MyAuthenticator.create");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
properties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ addDSProps(properties);
system = (InternalDistributedSystem) DistributedSystem.connect(properties);
InternalDistributedMember mbr = system.getDistributedMember();
assertEquals("expected the VM to have NORMAL vmKind",
@@ -249,7 +255,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
*
* @throws Exception
*/
- public void testStartTwoLocators() throws Exception {
+ public void ntestStartTwoLocators() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM loc1 = host.getVM(1);
@@ -273,7 +279,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("member-timeout", "2000");
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(properties);
SerializableCallable startLocator1 = new SerializableCallable("start locator1") {
@Override
public Object call() throws Exception {
@@ -362,7 +368,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
/**
* test lead member selection
*/
- public void testLeadMemberSelection() throws Exception {
+ public void ntestLeadMemberSelection() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
@@ -377,7 +383,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("locators", locators);
properties.put("enable-network-partition-detection", "true");
properties.put("disable-auto-reconnect", "true");
-
+ addDSProps(properties);
File logFile = new File("");
if (logFile.exists()) {
logFile.delete();
@@ -471,7 +477,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* We then kill the lead member and demonstrate that the original locator
* (which is now the sole remaining member) shuts itself down.
*/
- public void testLeadAndCoordFailure() throws Exception {
+ public void ntestLeadAndCoordFailure() throws Exception {
IgnoredException.addIgnoredException("Possible loss of quorum due");
disconnectAllFromDS();
Host host = Host.getHost(0);
@@ -497,7 +503,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
// properties.put("log-level", "fine");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(properties);
try {
final String uname = getUniqueName();
File logFile = new File("");
@@ -604,7 +610,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* We then shut down the group coordinator and observe the second locator
* pick up the job and the remaining member continues to operate normally.
*/
- public void testLeadFailureAndCoordShutdown() throws Exception {
+ public void ntestLeadFailureAndCoordShutdown() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
@@ -628,7 +634,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("disable-auto-reconnect", "true");
properties.put("member-timeout", "2000");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(properties);
SerializableRunnable stopLocator = getStopLocatorRunnable();
try {
@@ -750,7 +756,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
*/
// disabled on trunk - should be reenabled on cedar_dev_Oct12
// this test leaves a CloserThread around forever that logs "pausing" messages every 500 ms
- public void testForceDisconnectAndPeerShutdownCause() throws Exception {
+ public void ntestForceDisconnectAndPeerShutdownCause() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
@@ -774,7 +780,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("member-timeout", "2000");
properties.put("log-level", LogWriterUtils.getDUnitLogLevel());
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(properties);
SerializableRunnable stopLocator = getStopLocatorRunnable();
try {
@@ -887,7 +893,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* We kill the coordinator and shut down the lead member and observe the second locator
* pick up the job and the remaining member continue to operate normally.
*/
- public void testLeadShutdownAndCoordFailure() throws Exception {
+ public void ntestLeadShutdownAndCoordFailure() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
@@ -910,7 +916,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
properties.put("disable-auto-reconnect", "true");
properties.put("member-timeout", "2000");
properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(properties);
SerializableRunnable disconnect =
new SerializableRunnable("Disconnect from " + locators) {
public void run() {
@@ -1012,7 +1018,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* Tests that attempting to connect to a distributed system in which
* no locator is defined throws an exception.
*/
- public void testNoLocator() {
+ public void ntestNoLocator() {
disconnectAllFromDS();
Host host = Host.getHost(0);
int port =
@@ -1022,7 +1028,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
-
+ addDSProps(props);
final String expected = "java.net.ConnectException";
final String addExpected =
"<ExpectedException action=add>" + expected + "</ExpectedException>";
@@ -1072,7 +1078,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* <p>The locator is then restarted and is shown to take over the
* role of membership coordinator.
*/
- public void testOneLocator() throws Exception {
+ public void ntestOneLocator() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
@@ -1093,7 +1099,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
locProps.setProperty("mcast-port", "0");
locProps.setProperty("member-timeout", "1000");
locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(locProps);
Locator.startLocatorAndDS(port, logFile, locProps);
} catch (IOException ex) {
com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port, ex);
@@ -1110,6 +1116,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
props.setProperty("member-timeout", "1000");
+ addDSProps(props);
DistributedSystem.connect(props);
}
};
@@ -1120,7 +1127,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
props.setProperty("member-timeout", "1000");
-
+ addDSProps(props);
system = (InternalDistributedSystem) DistributedSystem.connect(props);
final DistributedMember coord = MembershipManagerHelper.getCoordinator(system);
@@ -1164,7 +1171,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* is correct. It then restarts the locator to demonstrate that
* it can connect to and function as the group coordinator
*/
- public void testLocatorBecomesCoordinator() throws Exception {
+ public void ntestLocatorBecomesCoordinator() throws Exception {
disconnectAllFromDS();
final String expected = "java.net.ConnectException";
final String addExpected =
@@ -1189,7 +1196,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
props.setProperty(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(props);
SerializableRunnable connect =
new SerializableRunnable("Connect to " + locators) {
public void run() {
@@ -1301,7 +1308,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
/**
* Tests starting multiple locators in multiple VMs.
*/
- public void testMultipleLocators() throws Exception {
+ public void ntestMultipleLocators() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
@@ -1323,7 +1330,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
dsProps.setProperty("locators", locators);
dsProps.setProperty("mcast-port", "0");
dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(dsProps);
vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
public void run() {
File logFile = new File("");
@@ -1357,6 +1364,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
+ addDSProps(props);
DistributedSystem.connect(props);
}
};
@@ -1366,7 +1374,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
-
+ addDSProps(props);
system = (InternalDistributedSystem) DistributedSystem.connect(props);
WaitCriterion ev = new WaitCriterion() {
@@ -1417,7 +1425,6 @@ public class LocatorDUnitTest extends DistributedTestCase {
* end up only have 1 master.
* GEODE-870
*/
- @Category(FlakyTest.class) // GEODE-1150: random ports, disk pollution, waitForCriterion, time sensitive, eats exceptions (fixed several)
public void testMultipleLocatorsRestartingAtSameTime() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
@@ -1441,8 +1448,8 @@ public class LocatorDUnitTest extends DistributedTestCase {
dsProps.setProperty(DistributionConfig.LOCATORS_NAME, locators);
dsProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
dsProps.setProperty(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
- dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ addDSProps(dsProps);
startLocatorSync(vm0, new Object[] { port1, dsProps });
startLocatorSync(vm1, new Object[] { port2, dsProps });
startLocatorSync(vm2, new Object[] { port3, dsProps });
@@ -1620,7 +1627,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
/**
* Tests starting multiple locators in multiple VMs.
*/
- public void testMultipleMcastLocators() throws Exception {
+ public void ntestMultipleMcastLocators() throws Exception {
disconnectAllFromDS();
IgnoredException.addIgnoredException("Could not stop Distribution Locator"); // shutdown timing issue in InternalLocator
Host host = Host.getHost(0);
@@ -1653,7 +1660,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-ttl", "0");
props.setProperty("enable-network-partition-detection", "true");
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
+ addDSProps(props);
Locator.startLocatorAndDS(port1, logFile, null, props);
} catch (IOException ex) {
com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port1, ex);
@@ -1671,6 +1678,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-ttl", "0");
props.setProperty("enable-network-partition-detection", "true");
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ addDSProps(props);
Locator.startLocatorAndDS(port2, logFile, null, props);
} catch (IOException ex) {
com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port2, ex);
@@ -1687,6 +1695,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
props.setProperty("mcast-ttl", "0");
props.setProperty("enable-network-partition-detection", "true");
+ addDSProps(props);
DistributedSystem.connect(props);
}
};
@@ -1700,7 +1709,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
props.setProperty("mcast-ttl", "0");
props.setProperty("enable-network-partition-detection", "true");
-
+ addDSProps(props);
system = (InternalDistributedSystem) DistributedSystem.connect(props);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
@@ -1735,7 +1744,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
* Tests that a VM can connect to a locator that is hosted in its
* own VM.
*/
- public void testConnectToOwnLocator() throws Exception {
+ public void ntestConnectToOwnLocator() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
@@ -1752,6 +1761,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ addDSProps(props);
system = (InternalDistributedSystem) DistributedSystem.connect(props);
system.disconnect();
} finally {
@@ -1762,7 +1772,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
/**
* Tests that a single VM can NOT host multiple locators
*/
- public void testHostingMultipleLocators() throws Exception {
+ public void ntestHostingMultipleLocators() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
//VM vm = host.getVM(0);
@@ -1797,6 +1807,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
props.setProperty("mcast-port", "0");
props.setProperty("locators", locators);
props.setProperty("log-level", LogWriterUtils.getDUnitLogLevel());
+ addDSProps(props);
DistributedSystem.connect(props);
}
};
@@ -1819,7 +1830,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
*
* @since 4.1
*/
- public void testRestartLocator() throws Exception {
+ public void ntestRestartLocator() throws Exception {
disconnectAllFromDS();
port1 =
AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -1831,6 +1842,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "[" + port1 + "]");
p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
p.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ addDSProps(p);
if (stateFile.exists()) {
stateFile.delete();
}
@@ -1916,6 +1928,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
Properties locProps = new Properties();
locProps.put("mcast-port", "0");
locProps.put("log-level", LogWriterUtils.getDUnitLogLevel());
+ addDSProps(locProps);
Locator.startLocatorAndDS(port, logFile, locProps);
} catch (IOException ex) {
com.gemstone.gemfire.test.dunit.Assert.fail("While starting locator on port " + port, ex);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
new file mode 100755
index 0000000..37f14c3
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorUDPSecurityDUnitTest.java
@@ -0,0 +1,28 @@
+package com.gemstone.gemfire.distributed;
+
+import java.util.Properties;
+
+import org.junit.Test;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+
+public class LocatorUDPSecurityDUnitTest extends LocatorDUnitTest{
+
+ public LocatorUDPSecurityDUnitTest(String name) {
+ super(name);
+ }
+
+ @Test
+ public void testLoop() throws Exception {
+ for(int i=0; i < 2; i++) {
+ testMultipleLocatorsRestartingAtSameTime();
+ tearDown();
+ setUp();
+ }
+ }
+
+ @Override
+ protected void addDSProps(Properties p) {
+ p.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 1e1724d..0d3b9fc 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -190,7 +190,7 @@ public class GMSJoinLeaveJUnitTest {
// simulate a response being received
InternalDistributedMember sender = mockMembers[2];
- FindCoordinatorResponse resp = new FindCoordinatorResponse(coordinator, sender);
+ FindCoordinatorResponse resp = new FindCoordinatorResponse(coordinator, sender, null, 0);
gmsJoinLeave.processMessage(resp);
// tell GMSJoinLeave that a unit test is running so it won't clear the
// responses collection
@@ -207,7 +207,7 @@ public class GMSJoinLeaveJUnitTest {
public void testProcessJoinMessageRejectOldMemberVersion() throws IOException {
initMocks();
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null, -1));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null, -1, 0));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -230,7 +230,7 @@ public class GMSJoinLeaveJUnitTest {
when(authenticator.authenticate(mockMembers[0], credentials)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
when(services.getMessenger()).thenReturn(messenger);
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1, 0));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -242,7 +242,7 @@ public class GMSJoinLeaveJUnitTest {
when(authenticator.authenticate(mockMembers[0], null)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
when(services.getMessenger()).thenReturn(messenger);
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null, -1));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null, -1, 0));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -257,9 +257,22 @@ public class GMSJoinLeaveJUnitTest {
JoinResponseMessage[] joinResponse = gmsJoinLeave.getJoinResponseMessage();
- JoinResponseMessage jrm = new JoinResponseMessage();
+ JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], new byte[9], 233);
gmsJoinLeave.processMessage(jrm);
+ //this should NOT logs, this is just to inform member succesful joining
+ Assert.assertEquals(null, joinResponse[0]);
+
+ jrm = new JoinResponseMessage("rejected...", 0);
+ gmsJoinLeave.processMessage(jrm);
+ //this should log..
Assert.assertEquals(jrm, joinResponse[0]);
+
+ gmsJoinLeave.setJoinResponseMessage(null);
+
+ jrm = new JoinResponseMessage(mockMembers[0], new NetView(), 0);
+ gmsJoinLeave.processMessage(jrm);
+ //this should log..
+ Assert.assertEquals(jrm, joinResponse[0]);
}
/**
@@ -475,10 +488,10 @@ public class GMSJoinLeaveJUnitTest {
prepareAndInstallView(gmsJoinLeaveMemberId, createMemberList(gmsJoinLeaveMemberId,mockMembers[0]));
gmsJoinLeave.getView().add(mockMembers[1]);
GMSJoinLeaveTestHelper.becomeCoordinatorForTest(gmsJoinLeave);
- JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
+ JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1, 0);
msg.setSender(mockMembers[2]);
gmsJoinLeave.processMessage(msg);
- msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
+ msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1, 0);
msg.setSender(mockMembers[2]);
gmsJoinLeave.processMessage(msg);
@@ -872,7 +885,7 @@ public class GMSJoinLeaveJUnitTest {
initMocks(false);
System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
gmsJoinLeave.join();
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1, 0));
int viewRequests = gmsJoinLeave.getViewRequests().size();
assertTrue( "There should be 1 viewRequest but found " + viewRequests, viewRequests == 1);
@@ -1072,13 +1085,13 @@ public class GMSJoinLeaveJUnitTest {
initMocks(false);
HashSet<InternalDistributedMember> registrants = new HashSet<>();
registrants.add(mockMembers[0]);
- FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true);
+ FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true, null);
NetView view = createView();
- JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view);
+ JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view, 0);
TcpClientWrapper tcpClientWrapper = mock(TcpClientWrapper.class);
gmsJoinLeave.setTcpClientWrapper(tcpClientWrapper);
- FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1);
+ FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1, null, 0);
int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
when(tcpClientWrapper.sendCoordinatorFindRequest(new InetSocketAddress("localhost", 12345), fcreq, connectTimeout)).thenReturn(fcr);
callAsnyc(()->{gmsJoinLeave.installView(view);});
@@ -1099,14 +1112,14 @@ public class GMSJoinLeaveJUnitTest {
initMocks(false);
HashSet<InternalDistributedMember> registrants = new HashSet<>();
registrants.add(mockMembers[0]);
- FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true);
+ FindCoordinatorResponse fcr = new FindCoordinatorResponse(mockMembers[0], mockMembers[0], false, null, registrants, false, true, null);
NetView view = createView();
- JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view);
+ JoinResponseMessage jrm = new JoinResponseMessage(mockMembers[0], view, 0);
gmsJoinLeave.setJoinResponseMessage(jrm);
TcpClientWrapper tcpClientWrapper = mock(TcpClientWrapper.class);
gmsJoinLeave.setTcpClientWrapper(tcpClientWrapper);
- FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1);
+ FindCoordinatorRequest fcreq = new FindCoordinatorRequest(gmsJoinLeaveMemberId, new HashSet<>(), -1, null, 0);
int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
//passing wrong port here, so ot will fail
when(tcpClientWrapper.sendCoordinatorFindRequest(new InetSocketAddress("localhost", 12346), fcreq, connectTimeout)).thenReturn(fcr);
@@ -1224,7 +1237,7 @@ public class GMSJoinLeaveJUnitTest {
}
private void processJoinMessage(InternalDistributedMember coordinator, InternalDistributedMember newMember, int port) {
- JoinRequestMessage reqMsg = new JoinRequestMessage(coordinator, newMember, null, port);
+ JoinRequestMessage reqMsg = new JoinRequestMessage(coordinator, newMember, null, port, 0);
gmsJoinLeave.processMessage(reqMsg);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
index 3a08faa..1d7b8d0 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
@@ -158,7 +158,7 @@ public class GMSEncryptJUnitTest {
initMocks();
GMSEncrypt gmsEncrypt1 = new GMSEncrypt(services, mockMembers[1]); // this will be the sender
- gmsEncrypt1.addClusterKey();
+ gmsEncrypt1.initClusterSecretKey();
// establish the public keys for the sender and receiver
netView.setPublicKey(mockMembers[1], gmsEncrypt1.getPublicKeyBytes());
@@ -182,7 +182,7 @@ public class GMSEncryptJUnitTest {
initMocks();
GMSEncrypt gmsEncrypt1 = new GMSEncrypt(services, mockMembers[1]); // this will be the sender
- gmsEncrypt1.addClusterKey();
+ gmsEncrypt1.initClusterSecretKey();
GMSEncrypt gmsEncrypt2 = new GMSEncrypt(services, mockMembers[2]); // this will be the sender
// establish the public keys for the sender and receiver
@@ -191,7 +191,7 @@ public class GMSEncryptJUnitTest {
gmsEncrypt1.installView(netView, mockMembers[1]);
- byte[] secretBytes = gmsEncrypt1.getSecretBytes();
+ byte[] secretBytes = gmsEncrypt1.getClusterSecretKey();
gmsEncrypt2.addClusterKey(secretBytes);
gmsEncrypt2.installView(netView, mockMembers[1]);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 60e790b..14a0df1 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -69,6 +70,9 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Healt
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
@@ -96,10 +100,14 @@ public class JGroupsMessengerJUnitTest {
private InterceptUDP interceptor;
private long statsId = 123;
+ private void initMocks(boolean enableMcast) throws Exception {
+ initMocks(enableMcast, new Properties());
+ }
+
/**
* Create stub and mock objects
*/
- private void initMocks(boolean enableMcast) throws Exception {
+ private void initMocks(boolean enableMcast, Properties addProp) throws Exception {
if (messenger != null) {
messenger.stop();
messenger = null;
@@ -112,6 +120,7 @@ public class JGroupsMessengerJUnitTest {
nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]");
nonDefault.put(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "1");
+ nonDefault.putAll(addProp);
DistributionConfigImpl config = new DistributionConfigImpl(nonDefault);
RemoteTransportConfig tconfig = new RemoteTransportConfig(config,
DistributionManager.NORMAL_DM_TYPE);
@@ -134,6 +143,7 @@ public class JGroupsMessengerJUnitTest {
when(services.getHealthMonitor()).thenReturn(healthMonitor);
when(services.getManager()).thenReturn(manager);
when(services.getJoinLeave()).thenReturn(joinLeave);
+
DM dm = mock(DM.class);
InternalDistributedSystem system = InternalDistributedSystem.newInstanceForTesting(dm, nonDefault);
when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
@@ -141,6 +151,9 @@ public class JGroupsMessengerJUnitTest {
messenger = new JGroupsMessenger();
messenger.init(services);
+ //if I do this earlier then test this return messenger as null
+ when(services.getMessenger()).thenReturn(messenger);
+
String jgroupsConfig = messenger.getJGroupsStackConfig();
int startIdx = jgroupsConfig.indexOf("<com");
int insertIdx = jgroupsConfig.indexOf('>', startIdx+4) + 1;
@@ -429,15 +442,15 @@ public class JGroupsMessengerJUnitTest {
when(joinLeave.getView()).thenReturn(v);
InternalDistributedMember sender = createAddress(8888);
- JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
+ JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
- Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
+ Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
interceptor.up(new Event(Event.MSG, jmsg));
verify(mh, times(1)).processMessage(any(JoinRequestMessage.class));
LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing");
- jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, null, Version.CURRENT_ORDINAL);
+ jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL);
interceptor.up(new Event(Event.MSG, jmsg));
verify(manager).processMessage(any(LeaveRequestMessage.class));
@@ -469,7 +482,7 @@ public class JGroupsMessengerJUnitTest {
NetView v = new NetView(sender);
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
- JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
+ JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1, 0);
if (mcastMsg) {
msg.setMulticast(true);
}
@@ -481,7 +494,7 @@ public class JGroupsMessengerJUnitTest {
sentMessages == 1);
// send a big message and expect fragmentation
- msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1);
+ msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1, 0);
// configure an incoming message handler for JoinRequestMessage
final DistributionMessage[] messageReceived = new DistributionMessage[1];
@@ -697,7 +710,7 @@ public class JGroupsMessengerJUnitTest {
NetView view = new NetView(mbr);
// the digest should be set in an outgoing join response
- JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view);
+ JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view, 0);
messenger.filterOutgoingMessage(joinResponse);
assertNotNull(joinResponse.getMessengerData());
@@ -709,7 +722,7 @@ public class JGroupsMessengerJUnitTest {
assertNull(joinResponse.getMessengerData());
// the digest shouldn't be set in an outgoing rejection message
- joinResponse = new JoinResponseMessage("you can't join my distributed system. nyah nyah nyah!");
+ joinResponse = new JoinResponseMessage("you can't join my distributed system. nyah nyah nyah!", 0);
messenger.filterOutgoingMessage(joinResponse);
assertNull(joinResponse.getMessengerData());
@@ -805,7 +818,7 @@ public class JGroupsMessengerJUnitTest {
dmsg.setRecipients(recipients);
// a message is ignored during manager shutdown
- msg = messenger.createJGMessage(dmsg, new JGAddress(other), null, Version.CURRENT_ORDINAL);
+ msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
receiver.receive(msg);
verify(manager, never()).processMessage(isA(DistributionMessage.class));
@@ -896,6 +909,174 @@ public class JGroupsMessengerJUnitTest {
assertFalse(AvailablePort.isPortAvailable(services.getConfig().getDistributionConfig().getMcastPort(), AvailablePort.MULTICAST));
}
+ private NetView createView(InternalDistributedMember otherMbr) {
+ InternalDistributedMember sender = messenger.getMemberID();
+ List<InternalDistributedMember> mbrs = new ArrayList<>();
+ mbrs.add(sender);
+ mbrs.add(otherMbr);
+ NetView v = new NetView(sender, 1, mbrs);
+ return v;
+ }
+
+ @Test
+ public void testEncryptedFindCoordinatorRequest() throws Exception{
+ InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+
+ Properties p = new Properties();
+ p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+ initMocks(false, p);
+
+ NetView v = createView(otherMbr);
+
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+
+ messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+ messenger.initClusterKey();
+
+ FindCoordinatorRequest gfmsg = new FindCoordinatorRequest(messenger.getMemberID(), new ArrayList<InternalDistributedMember>(2), 1, messenger.getPublickey(messenger.getMemberID()), 1);
+ Set<InternalDistributedMember> recipients = new HashSet<>();
+ recipients.add(otherMbr);
+ gfmsg.setRecipients(recipients);
+
+ short version = Version.CURRENT_ORDINAL;
+
+ HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+
+ messenger.writeEncryptedMessage(gfmsg, version, out);
+
+ byte[] requestBytes = out.toByteArray();
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+
+ DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+
+ assertEquals(gfmsg, distributionMessage);
+ }
+
+ @Test
+ public void testEncryptedFindCoordinatorResponse() throws Exception{
+ InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+
+ Properties p = new Properties();
+ p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+ initMocks(false, p);
+
+ NetView v = createView(otherMbr);
+
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ otherMbrEncrptor.setPublicKey(messenger.getPublickey(messenger.getMemberID()), messenger.getMemberID());
+
+ messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+ messenger.initClusterKey();
+
+ FindCoordinatorResponse gfmsg = new FindCoordinatorResponse(messenger.getMemberID(), messenger.getMemberID(), messenger.getClusterSecretKey(), 1);
+ Set<InternalDistributedMember> recipients = new HashSet<>();
+ recipients.add(otherMbr);
+ gfmsg.setRecipients(recipients);
+
+ short version = Version.CURRENT_ORDINAL;
+
+ HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+
+ messenger.writeEncryptedMessage(gfmsg, version, out);
+
+ byte[] requestBytes = out.toByteArray();
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+
+ messenger.addRequestId(1, messenger.getMemberID());
+
+ DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+
+ assertEquals(gfmsg, distributionMessage);
+ }
+
+ @Test
+ public void testEncryptedJoinRequest() throws Exception{
+ InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+
+ Properties p = new Properties();
+ p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+ initMocks(false, p);
+
+ NetView v = createView(otherMbr);
+
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+
+ messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+ messenger.initClusterKey();
+
+ JoinRequestMessage gfmsg = new JoinRequestMessage(otherMbr, messenger.getMemberID(), null, 9789, 1);
+
+ short version = Version.CURRENT_ORDINAL;
+
+ HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+
+ messenger.writeEncryptedMessage(gfmsg, version, out);
+
+ byte[] requestBytes = out.toByteArray();
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+
+ DistributionMessage distributionMessage = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+
+ assertEquals(gfmsg, distributionMessage);
+ }
+
+ @Test
+ public void testEncryptedJoinResponse() throws Exception{
+ InternalDistributedMember otherMbr = new InternalDistributedMember("localhost", 8888);
+
+ Properties p = new Properties();
+ p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
+ initMocks(false, p);
+
+ NetView v = createView(otherMbr);
+
+ GMSEncrypt otherMbrEncrptor = new GMSEncrypt(services);
+ otherMbrEncrptor.setPublicKey(messenger.getPublickey(messenger.getMemberID()), messenger.getMemberID());
+
+ messenger.setPublicKey(otherMbrEncrptor.getPublicKeyBytes(), otherMbr);
+ messenger.initClusterKey();
+
+ JoinResponseMessage gfmsg = new JoinResponseMessage(otherMbr, messenger.getClusterSecretKey(), 1);
+
+ short version = Version.CURRENT_ORDINAL;
+
+ HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
+
+ messenger.writeEncryptedMessage(gfmsg, version, out);
+
+ byte[] requestBytes = out.toByteArray();
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+
+ messenger.addRequestId(1, messenger.getMemberID());
+
+ DistributionMessage gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+
+ assertEquals(gfmsg, gfMessageAtOtherMbr);
+
+ //lets send view as well..
+
+ InstallViewMessage installViewMessage = new InstallViewMessage(v, null, true);
+
+ out = new HeapDataOutputStream(Version.CURRENT);
+
+ messenger.writeEncryptedMessage(installViewMessage, version, out);
+
+ requestBytes = out.toByteArray();
+
+ otherMbrEncrptor.addClusterKey(((JoinResponseMessage)gfMessageAtOtherMbr).getSecretPk());
+
+ dis = new DataInputStream(new ByteArrayInputStream(requestBytes));
+
+ gfMessageAtOtherMbr = messenger.readEncryptedMessage(dis, version, otherMbrEncrptor);
+
+ assertEquals(installViewMessage, gfMessageAtOtherMbr);
+
+ }
+
/**
* creates an InternalDistributedMember address that can be used
* with the doctored JGroups channel. This includes a logical
[2/3] incubator-geode git commit: GEODE-1372 added unit test and some
more fixes.
Posted by hi...@apache.org.
GEODE-1372 added unit test and some more fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b6a73441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b6a73441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b6a73441
Branch: refs/heads/feature/GEODE-1372
Commit: b6a734415d1bec96c1f84a51b1fca16165bab0ae
Parents: 4d5f947
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Wed Jun 1 15:27:24 2016 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Wed Jun 1 15:27:24 2016 -0700
----------------------------------------------------------------------
.../internal/membership/NetView.java | 4 +-
.../membership/gms/interfaces/Messenger.java | 12 +
.../gms/locator/FindCoordinatorRequest.java | 22 +-
.../gms/locator/FindCoordinatorResponse.java | 70 ++-
.../membership/gms/locator/GMSLocator.java | 12 +-
.../membership/gms/membership/GMSJoinLeave.java | 151 +++++--
.../gms/messages/InstallViewMessage.java | 25 ++
.../gms/messages/JoinRequestMessage.java | 49 ++-
.../gms/messages/JoinResponseMessage.java | 63 ++-
.../membership/gms/messenger/GMSEncrypt.java | 82 ++--
.../gms/messenger/JGroupsMessenger.java | 431 +++++++++++++------
.../internal/InternalDataSerializer.java | 18 +
.../DistributedMulticastRegionDUnitTest.java | 2 +
.../gemfire/distributed/LocatorDUnitTest.java | 77 ++--
.../LocatorUDPSecurityDUnitTest.java | 28 ++
.../gms/membership/GMSJoinLeaveJUnitTest.java | 43 +-
.../gms/messenger/GMSEncryptJUnitTest.java | 6 +-
.../messenger/JGroupsMessengerJUnitTest.java | 199 ++++++++-
18 files changed, 1010 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index 365a193..92fbcac 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -586,7 +586,7 @@ public class NetView implements DataSerializableFixedID {
InternalDataSerializer.writeSet(crashedMembers, out);
DataSerializer.writeIntArray(failureDetectionPorts, out);
// TODO expensive serialization
- DataSerializer.writeObject(publicKeys, out);
+ DataSerializer.writeHashMap(publicKeys, out);
}
@Override
@@ -598,7 +598,7 @@ public class NetView implements DataSerializableFixedID {
shutdownMembers = InternalDataSerializer.readHashSet(in);
crashedMembers = InternalDataSerializer.readHashSet(in);
failureDetectionPorts = DataSerializer.readIntArray(in);
- publicKeys = DataSerializer.readObject(in);
+ publicKeys = DataSerializer.readHashMap(in);
}
/** this will deserialize as an ArrayList */
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index e10f325..3e9a2dc 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -86,4 +86,16 @@ public interface Messenger extends Service {
* @param state the state of that member's outgoing messaging to this member
*/
void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException;
+
+ byte[] getPublickey(InternalDistributedMember mbr);
+
+ void setPublicKey(byte[] publickey, InternalDistributedMember mbr);
+
+ void setClusterSecretKey(byte[] clusterSecretKey);
+
+ byte[] getClusterSecretKey();
+
+ int getRequestId();
+
+ void initClusterKey();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
index 5c0a1d1..c434c25 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Version;
public class FindCoordinatorRequest extends HighPriorityDistributionMessage
@@ -35,15 +36,20 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
private InternalDistributedMember memberID;
private Collection<InternalDistributedMember> rejectedCoordinators;
private int lastViewId;
-
+ private byte[] myPublicKey;
+ private int requestId;
+
public FindCoordinatorRequest(InternalDistributedMember myId) {
this.memberID = myId;
}
- public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, int lastViewId) {
+ public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators,
+ int lastViewId, byte[] pk, int requestId) {
this.memberID = myId;
this.rejectedCoordinators = rejectedCoordinators;
this.lastViewId = lastViewId;
+ this.myPublicKey = pk;
+ this.requestId = requestId;
}
public FindCoordinatorRequest() {
@@ -54,6 +60,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
return memberID;
}
+ public byte[] getMyPublicKey() {
+ return myPublicKey;
+ }
+
public Collection<InternalDistributedMember> getRejectedCoordinators() {
return rejectedCoordinators;
}
@@ -81,6 +91,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
public int getDSFID() {
return FIND_COORDINATOR_REQ;
}
+
+ public int getRequestId() {
+ return requestId;
+ }
@Override
public void toData(DataOutput out) throws IOException {
@@ -94,6 +108,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
out.writeInt(0);
}
out.writeInt(lastViewId);
+ out.writeInt(requestId);
+ InternalDataSerializer.writeByteArray(this.myPublicKey, out);
}
@Override
@@ -105,6 +121,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
this.rejectedCoordinators.add((InternalDistributedMember)DataSerializer.readObject(in));
}
this.lastViewId = in.readInt();
+ this.requestId = in.readInt();
+ this.myPublicKey = InternalDataSerializer.readByteArray(in);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 0427cb4..07f0e58 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.locator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -42,12 +43,14 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
private boolean networkPartitionDetectionEnabled;
private boolean usePreferredCoordinators;
private boolean isShortForm;
-
+ private byte[] coordinatorPublicKey;
+
+ private int requestId;
public FindCoordinatorResponse(InternalDistributedMember coordinator,
InternalDistributedMember senderId,
boolean fromView, NetView view, HashSet<InternalDistributedMember> registrants,
- boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators) {
+ boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators, byte[] pk) {
this.coordinator = coordinator;
this.senderId = senderId;
this.fromView = fromView;
@@ -56,19 +59,30 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
this.networkPartitionDetectionEnabled = networkPartitionDectionEnabled;
this.usePreferredCoordinators = usePreferredCoordinators;
this.isShortForm = false;
+ this.coordinatorPublicKey = pk;
}
public FindCoordinatorResponse(InternalDistributedMember coordinator,
- InternalDistributedMember senderId) {
+ InternalDistributedMember senderId, byte[] pk, int requestId) {
this.coordinator = coordinator;
this.senderId = senderId;
this.isShortForm = true;
+ this.coordinatorPublicKey = pk;
+ this.requestId = requestId;
}
public FindCoordinatorResponse() {
// no-arg constructor for serialization
}
+ public byte[] getCoordinatorPublicKey() {
+ return coordinatorPublicKey;
+ }
+
+ public int getRequestId() {
+ return requestId;
+ }
+
public boolean isNetworkPartitionDetectionEnabled() {
return networkPartitionDetectionEnabled;
}
@@ -131,6 +145,7 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(coordinator, out);
DataSerializer.writeObject(senderId, out);
+ InternalDataSerializer.writeByteArray(coordinatorPublicKey, out);
out.writeBoolean(isShortForm);
out.writeBoolean(fromView);
out.writeBoolean(networkPartitionDetectionEnabled);
@@ -143,7 +158,8 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
coordinator = DataSerializer.readObject(in);
senderId = DataSerializer.readObject(in);
- isShortForm = in.readBoolean();
+ coordinatorPublicKey = InternalDataSerializer.readByteArray(in);
+ isShortForm = in.readBoolean();
if (!isShortForm) {
fromView = in.readBoolean();
networkPartitionDetectionEnabled = in.readBoolean();
@@ -158,4 +174,50 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage
throw new IllegalStateException("this message should not be executed");
}
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FindCoordinatorResponse other = (FindCoordinatorResponse) obj;
+ if (coordinator == null) {
+ if (other.coordinator != null)
+ return false;
+ } else if (!coordinator.equals(other.coordinator))
+ return false;
+ if (!Arrays.equals(coordinatorPublicKey, other.coordinatorPublicKey))
+ return false;
+ if (fromView != other.fromView)
+ return false;
+ if (isShortForm != other.isShortForm)
+ return false;
+ if (networkPartitionDetectionEnabled != other.networkPartitionDetectionEnabled)
+ return false;
+ if (registrants == null) {
+ if (other.registrants != null)
+ return false;
+ } else if (!registrants.equals(other.registrants))
+ return false;
+ //as we are not sending requestId as part of FinDCoordinator resposne
+ /*if (requestId != other.requestId)
+ return false;*/
+ if (senderId == null) {
+ if (other.senderId != null)
+ return false;
+ } else if (!senderId.equals(other.senderId))
+ return false;
+ if (usePreferredCoordinators != other.usePreferredCoordinators)
+ return false;
+ if (view == null) {
+ if (other.view != null)
+ return false;
+ } else if (!view.equals(other.view))
+ return false;
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index aab9002..402940b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -167,7 +167,7 @@ public class GMSLocator implements Locator, NetLocator {
}
} else if (request instanceof FindCoordinatorRequest) {
FindCoordinatorRequest findRequest = (FindCoordinatorRequest)request;
-
+ services.getMessenger().setPublicKey(findRequest.getMyPublicKey(), findRequest.getMemberID());
if (findRequest.getMemberID() != null) {
InternalDistributedMember coord = null;
@@ -228,9 +228,17 @@ public class GMSLocator implements Locator, NetLocator {
}
synchronized(registrants) {
+ byte[] coordPk = null;
+ if(view != null) {
+ coordPk = (byte[])view.getPublicKey(coord);
+ }
+ if (coordPk == null) {
+ coordPk = services.getMessenger().getPublickey(coord);
+ }
response = new FindCoordinatorResponse(coord, localAddress,
fromView, view, new HashSet<InternalDistributedMember>(registrants),
- this.networkPartitionDetectionEnabled, this.usePreferredCoordinators);
+ this.networkPartitionDetectionEnabled, this.usePreferredCoordinators,
+ coordPk);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index d70884a..3cd634a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -39,6 +39,7 @@ import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.AuthenticationFailedException;
+
import org.apache.logging.log4j.Logger;
import java.io.IOException;
@@ -361,8 +362,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
} else {
logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
int port = services.getHealthMonitor().getFailureDetectionPort();
- JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port);
- services.getMessenger().send(req, state.view);
+ JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port,
+ services.getMessenger().getRequestId());
+ //services.getMessenger().send(req, state.view);
+ services.getMessenger().send(req);
}
JoinResponseMessage response = null;
@@ -414,6 +417,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
NetView v = response.getCurrentView();
InternalDistributedMember coord = v.getCoordinator();
if (searchState.alreadyTried.contains(coord)) {
+ searchState.view = response.getCurrentView();
// we already sent join request to it..so lets wait some more time here
// assuming we got this response immediately, so wait for same timeout here..
long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5);
@@ -421,7 +425,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
response = joinResponse[0];
} else {
// try on this coordinator
- searchState.possibleCoordinator = coord;
+ searchState.view = response.getCurrentView();
response = null;
}
searchState.view = v;
@@ -467,7 +471,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID());
- JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+ JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version", incomingRequest.getRequestId());
m.setRecipient(incomingRequest.getMemberID());
services.getMessenger().send(m);
return;
@@ -480,7 +484,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
rejection = e.getMessage();
}
if (rejection != null && rejection.length() > 0) {
- JoinResponseMessage m = new JoinResponseMessage(rejection);
+ JoinResponseMessage m = new JoinResponseMessage(rejection, 0);
m.setRecipient(incomingRequest.getMemberID());
services.getMessenger().send(m);
return;
@@ -632,13 +636,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.debug("JoinLeave is recording the request to be processed in the next membership view");
synchronized (viewRequests) {
viewRequests.add(request);
+ if (viewCreator != null) {
+ boolean joinResponseSent = viewCreator.informToPendingJoinRequests();
+
+ if (!joinResponseSent && request instanceof JoinRequestMessage) {
+ JoinRequestMessage jreq = (JoinRequestMessage) request;
+ // this will inform about cluster-secret key, as we have authenticated at this point
+ JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(), services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
+ services.getMessenger().send(response);
+ }
+ }
viewRequests.notifyAll();
}
- if (viewCreator != null) {
- viewCreator.informToPendingJoinRequests();
- }
+
}
-
+
// for testing purposes, returns a copy of the view requests for verification
List<DistributionMessage> getViewRequests() {
synchronized (viewRequests) {
@@ -710,6 +722,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (newView != null) {
viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers());
}
+ services.getMessenger().initClusterKey();
viewCreator.setDaemon(true);
logger.info("ViewCreator starting on:" + localAddress);
viewCreator.start();
@@ -752,7 +765,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void sendJoinResponses(NetView newView, List<InternalDistributedMember> newMbrs) {
for (InternalDistributedMember mbr : newMbrs) {
- JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+ JoinResponseMessage response = new JoinResponseMessage(mbr, newView, 0);
services.getMessenger().send(response);
}
}
@@ -770,14 +783,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
- for (InternalDistributedMember mbr : newMbrs) {
- JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
- services.getMessenger().send(response);
- }
- }
-
-
boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
return sendView(view, newMembers, true, this.prepareProcessor);
}
@@ -841,7 +846,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
pendingRemovals.removeAll(view.getCrashedMembers());
viewReplyProcessor.initialize(id, responders);
viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
- services.getMessenger().send(msg, view);
+ addPublickeysToView(view);
+ services.getMessenger().send(msg);
// only wait for responses during preparation
if (preparing) {
@@ -868,13 +874,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return true;
}
+ private void addPublickeysToView(NetView view) {
+ //TODO: is this check is correct
+ if (services != null && services.getConfig() != null && services.getConfig().getDistributionConfig() != null) {
+ String sDHAlgo = services.getConfig().getDistributionConfig().getSecurityClientDHAlgo();
+ if (sDHAlgo != null && !sDHAlgo.isEmpty()) {
+ List<InternalDistributedMember> mbrs = view.getMembers();
+ Iterator<InternalDistributedMember> itr = mbrs.iterator();
+
+ while (itr.hasNext()) {
+ InternalDistributedMember mbr = itr.next();
+ byte[] pk = services.getMessenger().getPublickey(mbr);
+ view.setPublicKey(mbr, pk);
+ }
+ }
+ }
+ }
private void processViewMessage(final InstallViewMessage m) {
NetView view = m.getView();
if(currentView != null && !currentView.contains(m.getSender())) {
- logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
- return;
+ if(this.preparedView == null || !this.preparedView.contains(m.getSender()))
+ {
+ logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
+ return;
+ }
}
if (currentView != null && view.getViewId() < currentView.getViewId()) {
@@ -927,7 +952,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void ackView(InstallViewMessage m) {
if (!playingDead && m.getView().contains(m.getView().getCreator())) {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()), m.getView());
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
}
}
@@ -968,7 +993,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return findCoordinatorFromView();
}
- FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
+ FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId,
+ services.getMessenger().getPublickey(localAddress), services.getMessenger().getRequestId());
Set<InternalDistributedMember> possibleCoordinators = new HashSet<InternalDistributedMember>();
Set<InternalDistributedMember> coordinatorsWithView = new HashSet<InternalDistributedMember>();
@@ -988,6 +1014,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout);
FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null;
if (response != null) {
+ setCoordinatorPublicKey(response);
state.locatorsContacted++;
if (!state.hasContactedAJoinedLocator &&
response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
@@ -1084,16 +1111,35 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (state.registrants != null) {
recipients.addAll(state.registrants);
}
- recipients.remove(localAddress);
- FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId);
- req.setRecipients(v.getMembers());
+ recipients.remove(localAddress);
+ // FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+ // localAddress), services.getMessenger().getRequestId());
+ //req.setRecipients(v.getMembers());
+
boolean testing = unitTesting.contains("findCoordinatorFromView");
synchronized (state.responses) {
if (!testing) {
state.responses.clear();
}
- services.getMessenger().send(req);
+
+ if (!services.getConfig().getDistributionConfig().getSecurityClientDHAlgo().isEmpty()) {
+ for (InternalDistributedMember mbr : v.getMembers()) {
+ Set<InternalDistributedMember> r = new HashSet<>();
+ r.add(mbr);
+ FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+ localAddress), services.getMessenger().getRequestId());
+ req.setRecipients(r);
+
+ services.getMessenger().send(req, v);
+ }
+ } else {
+ FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+ localAddress), services.getMessenger().getRequestId());
+ req.setRecipients(v.getMembers());
+
+ services.getMessenger().send(req, v);
+ }
try {
if (!testing) {
state.responses.wait(DISCOVERY_TIMEOUT);
@@ -1144,8 +1190,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void processJoinResponse(JoinResponseMessage rsp) {
synchronized (joinResponse) {
if (!this.isJoined) {
- joinResponse[0] = rsp;
- joinResponse.notifyAll();
+ //1. our joinRequest rejected.
+ //2. Member which was coordinator but just now some other member became coordinator
+ //3. we got message with secret key, but still view is coming and that will inform the joining thread
+ if (rsp.getRejectionMessage() != null || rsp.getCurrentView() != null) {
+ joinResponse[0] = rsp;
+ joinResponse.notifyAll();
+ } else {
+ //we got secret key lets add it
+ services.getMessenger().setClusterSecretKey(rsp.getSecretPk());
+ }
}
}
}
@@ -1170,9 +1224,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
FindCoordinatorResponse resp;
if (this.isJoined) {
NetView v = currentView;
- resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress);
+ resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress,
+ services.getMessenger().getPublickey(v.getCoordinator()), req.getRequestId());
} else {
- resp = new FindCoordinatorResponse(localAddress, localAddress);
+ resp = new FindCoordinatorResponse(localAddress, localAddress,
+ services.getMessenger().getPublickey(localAddress), req.getRequestId());
}
resp.setRecipient(req.getMemberID());
services.getMessenger().send(resp);
@@ -1183,6 +1239,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
searchState.responses.add(resp);
searchState.responses.notifyAll();
}
+ setCoordinatorPublicKey(resp);
+ }
+
+ private void setCoordinatorPublicKey(FindCoordinatorResponse response) {
+ if (response.getCoordinator() != null && response.getCoordinatorPublicKey() != null)
+ services.getMessenger().setPublicKey(response.getCoordinatorPublicKey(), response.getCoordinator());
}
private void processNetworkPartitionMessage(NetworkPartitionMessage msg) {
@@ -1993,36 +2055,40 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- synchronized void informToPendingJoinRequests() {
+ synchronized boolean informToPendingJoinRequests() {
+ boolean joinResponseSent = false;
if (!shutdown) {
- return;
+ return joinResponseSent;
}
-
ArrayList<DistributionMessage> requests = new ArrayList<>();
synchronized (viewRequests) {
if (viewRequests.size() > 0) {
requests.addAll(viewRequests);
} else {
- return;
+ return joinResponseSent;
}
viewRequests.clear();
}
+
for (DistributionMessage msg : requests) {
switch (msg.getDSFID()) {
case JOIN_REQUEST:
- logger.info("Informing to pending join requests {}", msg);
-
+
NetView v = currentView;
+ logger.info("Informing to pending join requests {} myid {} coord {}", msg, localAddress, v.getCoordinator());
if (!v.getCoordinator().equals(localAddress)) {
+ joinResponseSent = true;
//lets inform that coordinator has been changed
- JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v);
+ JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v, ((JoinRequestMessage) msg).getRequestId());
services.getMessenger().send(jrm);
}
default:
break;
}
}
+
+ return joinResponseSent;
}
/**
@@ -2033,7 +2099,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
void createAndSendView(List<DistributionMessage> requests) throws InterruptedException {
List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
- Map<InternalDistributedMember, Object> joinKeys = new HashMap<>(10);
Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
List<InternalDistributedMember> removalReqs = new ArrayList<>(10);
List<String> removalReasons = new ArrayList<String>(10);
@@ -2067,7 +2132,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!joinReqs.contains(mbr)) {
joinReqs.add(mbr);
joinPorts.put(mbr, port);
- joinKeys.put(mbr, jmsg.getPublicKey());
}
break;
case LEAVE_REQUEST_MESSAGE:
@@ -2137,7 +2201,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
for (InternalDistributedMember mbr : joinReqs) {
if (mbrs.contains(mbr)) {
newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
- newView.setPublicKey(mbr, joinKeys.get(mbr));
}
}
if (currentView != null) {
@@ -2161,7 +2224,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
- sendJoinResponses(joinReqs, newView);
+ //we already sent whrn we got join request
+ //sendJoinResponses(newView, joinReqs);
// send removal messages before installing the view so we stop
// getting messages from members that have been kicked out
@@ -2289,7 +2353,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// we also send a join response so that information like the multicast message digest
// can be transmitted to the new members w/o including it in the view message
- sendJoinResponses(newView, joinReqs);
+ //we already sent whrn we got join request
+ //sendJoinResponses(newView, joinReqs);
if (markViewCreatorForShutdown && getViewCreator() != null) {
shutdown = true;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index c41584f..224fef1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -110,4 +110,29 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
+")";
}
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InstallViewMessage other = (InstallViewMessage) obj;
+ if (credentials == null) {
+ if (other.credentials != null)
+ return false;
+ } else if (!credentials.equals(other.credentials))
+ return false;
+ if (kind != other.kind)
+ return false;
+ if (previousViewId != other.previousViewId)
+ return false;
+ if (view == null) {
+ if (other.view != null)
+ return false;
+ } else if (!view.equals(other.view))
+ return false;
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 5545935..b282daa 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,25 +30,25 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
private InternalDistributedMember memberID;
private Object credentials;
private int failureDetectionPort = -1;
- private Object publicKey;
-
+ private int requestId;
+
public JoinRequestMessage(InternalDistributedMember coord,
- InternalDistributedMember id, Object credentials, int fdPort) {
+ InternalDistributedMember id, Object credentials, int fdPort, int requestId) {
super();
setRecipient(coord);
this.memberID = id;
this.credentials = credentials;
- this.publicKey = null;
this.failureDetectionPort = fdPort;
+ this.requestId = requestId;
}
public JoinRequestMessage() {
// no-arg constructor for serialization
}
- public void setPublicKey(Object key) {
- this.publicKey = key;
+ public int getRequestId() {
+ return requestId;
}
-
+
@Override
public int getDSFID() {
return JOIN_REQUEST;
@@ -67,10 +67,6 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
return credentials;
}
- public Object getPublicKey() {
- return publicKey;
- }
-
@Override
public String toString() {
return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " failureDetectionPort:" + failureDetectionPort;
@@ -85,24 +81,49 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(memberID, out);
DataSerializer.writeObject(credentials, out);
- DataSerializer.writeObject(publicKey, out);
DataSerializer.writePrimitiveInt(failureDetectionPort, out);
// preserve the multicast setting so the receiver can tell
// if this is a mcast join request
out.writeBoolean(getMulticast());
+ out.writeInt(requestId);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
memberID = DataSerializer.readObject(in);
credentials = DataSerializer.readObject(in);
- publicKey = DataSerializer.readObject(in);
failureDetectionPort = DataSerializer.readPrimitiveInt(in);
setMulticast(in.readBoolean());
+ requestId = in.readInt();
}
public int getFailureDetectionPort() {
return failureDetectionPort;
}
-
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JoinRequestMessage other = (JoinRequestMessage) obj;
+ if (credentials == null) {
+ if (other.credentials != null)
+ return false;
+ } else if (!credentials.equals(other.credentials))
+ return false;
+ if (failureDetectionPort != other.failureDetectionPort)
+ return false;
+ if (memberID == null) {
+ if (other.memberID != null)
+ return false;
+ } else if (!memberID.equals(other.memberID))
+ return false;
+ if (requestId != other.requestId)
+ return false;
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index ad9c319..ff20a4e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -37,20 +38,39 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
private String rejectionMessage;
private InternalDistributedMember memberID;
private byte[] messengerData;
-
- public JoinResponseMessage(InternalDistributedMember memberID, NetView view) {
+ private int requestId;
+ private byte[] secretPk;
+
+ public JoinResponseMessage(InternalDistributedMember memberID, NetView view, int requestId) {
this.currentView = view;
this.memberID = memberID;
+ this.requestId = requestId;
setRecipient(memberID);
}
- public JoinResponseMessage(String rejectionMessage) {
+ public JoinResponseMessage(InternalDistributedMember memberID, byte[] sPk, int requestId) {
+ this.memberID = memberID;
+ this.requestId = requestId;
+ this.secretPk = sPk;
+ setRecipient(memberID);
+ }
+
+ public JoinResponseMessage(String rejectionMessage, int requestId) {
this.rejectionMessage = rejectionMessage;
+ this.requestId = requestId;
}
public JoinResponseMessage() {
// no-arg constructor for serialization
}
+
+ public byte[] getSecretPk() {
+ return secretPk;
+ }
+
+ public int getRequestId() {
+ return requestId;
+ }
public NetView getCurrentView() {
return currentView;
@@ -101,6 +121,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
DataSerializer.writeObject(memberID, out);
DataSerializer.writeString(rejectionMessage, out);
DataSerializer.writeByteArray(messengerData, out);
+ DataSerializer.writeByteArray(secretPk, out);
}
@Override
@@ -109,6 +130,42 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
memberID = DataSerializer.readObject(in);
rejectionMessage = DataSerializer.readString(in);
messengerData = DataSerializer.readByteArray(in);
+ secretPk = DataSerializer.readByteArray(in);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JoinResponseMessage other = (JoinResponseMessage) obj;
+ if (currentView == null) {
+ if (other.currentView != null)
+ return false;
+ } else if (!currentView.equals(other.currentView))
+ return false;
+ if (memberID == null) {
+ if (other.memberID != null)
+ return false;
+ } else if (!memberID.equals(other.memberID))
+ return false;
+ if (!Arrays.equals(messengerData, other.messengerData))
+ return false;
+ if (rejectionMessage == null) {
+ if (other.rejectionMessage != null)
+ return false;
+ } else if (!rejectionMessage.equals(other.rejectionMessage))
+ return false;
+ //as we are not sending as part of JoinResposne
+ /*if (requestId != other.requestId)
+ return false;*/
+ if (!Arrays.equals(secretPk, other.secretPk))
+ return false;
+ return true;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index f307290..047bb03 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -23,7 +23,6 @@ import java.security.spec.X509EncodedKeySpec;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.crypto.Cipher;
@@ -37,18 +36,14 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
-import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.logging.LogService;
public class GMSEncrypt implements Cloneable {
public static long encodingsPerformed;
public static long decodingsPerformed;
- private static final Logger logger = LogService.getLogger();
-
// Parameters for the Diffie-Hellman key exchange
private static final BigInteger dhP = new BigInteger("13528702063991073999718992897071702177131142188276542919088770094024269" + "73079899070080419278066109785292538223079165925365098181867673946"
+ "34756714063947534092593553024224277712367371302394452615862654308" + "11180902979719649450105660478776364198726078338308557022096810447" + "3500348898008043285865193451061481841186553");
@@ -74,27 +69,27 @@ public class GMSEncrypt implements Cloneable {
private ClusterEncryptor clusterEncryptor;
- protected void installView(NetView view) throws Exception {
+ protected void installView(NetView view) {
this.view = view;
this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes());
}
- protected void installView(NetView view, InternalDistributedMember mbr) throws Exception {
+ protected void installView(NetView view, InternalDistributedMember mbr) {
this.view = view;
- // this.view.setPublicKey(mbr, getPublicKeyBytes());
- // TODO remove ciphers for departed members
- // addClusterKey();
}
- protected byte[] getSecretBytes() {
+ protected byte[] getClusterSecretKey() {
return this.clusterEncryptor.secretBytes;
}
- protected synchronized void addClusterKey() throws Exception {
- this.clusterEncryptor = new ClusterEncryptor(this);
+ protected synchronized void initClusterSecretKey() throws Exception {
+ if(this.clusterEncryptor == null) {
+ this.clusterEncryptor = new ClusterEncryptor(this);
+ }
}
- protected synchronized void addClusterKey(byte[] secretBytes) throws Exception {
+ protected synchronized void addClusterKey(byte[] secretBytes) {
+ //TODO we are reseeting here, in case there is some race
this.clusterEncryptor = new ClusterEncryptor(secretBytes);
}
@@ -124,6 +119,11 @@ public class GMSEncrypt implements Cloneable {
public byte[] decryptData(byte[] data) throws Exception {
return this.clusterEncryptor.decryptBytes(data);
}
+
+ public byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception {
+ PeerEncryptor encryptor = new PeerEncryptor(pkBytes);
+ return encryptor.decryptBytes(data);
+ }
public byte[] encryptData(byte[] data) throws Exception {
return this.clusterEncryptor.encryptBytes(data);
@@ -132,6 +132,26 @@ public class GMSEncrypt implements Cloneable {
protected byte[] getPublicKeyBytes() {
return dhPublicKey.getEncoded();
}
+
+ protected byte[] getPublicKey(InternalDistributedMember member) {
+ try {
+ InternalDistributedMember localMbr = services.getMessenger().getMemberID();
+ if (localMbr != null && localMbr.equals(member)) {
+ return this.dhPublicKey.getEncoded();// local one
+ }
+ return getPeerEncryptor(member).peerPublicKey.getEncoded();
+ } catch (Exception e) {
+ throw new RuntimeException("Not found public key for member " + member, e);
+ }
+ }
+
+ protected void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+ try {
+ createPeerEncryptor(mbr, publickey);
+ }catch(Exception e) {
+ throw new RuntimeException("Unable to create peer encryptor " + mbr, e);
+ }
+ }
@Override
protected GMSEncrypt clone() throws CloneNotSupportedException {
@@ -142,7 +162,6 @@ public class GMSEncrypt implements Cloneable {
X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(this.dhPublicKey.getEncoded());
KeyFactory keyFact = KeyFactory.getInstance("DH");
- // PublicKey pubKey = keyFact.generatePublic(x509KeySpec);
gmsEncrypt.dhPublicKey = keyFact.generatePublic(x509KeySpec);
final String format = this.dhPrivateKey.getFormat();
System.out.println("private key format " + format);
@@ -180,16 +199,20 @@ public class GMSEncrypt implements Cloneable {
}
}
- protected synchronized PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
+ protected PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
PeerEncryptor result = memberToPeerEncryptor.get(member);
if (result == null) {
- result = createPeerEncryptor(member);
+ synchronized (this) {
+ result = memberToPeerEncryptor.get(member);
+ if (result == null) {
+ result = createPeerEncryptor(member, (byte[]) view.getPublicKey(member));
+ }
+ }
}
return result;
}
- private PeerEncryptor createPeerEncryptor(InternalDistributedMember member) throws Exception {
- byte[] peerKeyBytes = (byte[]) view.getPublicKey(member);
+ private PeerEncryptor createPeerEncryptor(InternalDistributedMember member, byte[] peerKeyBytes) throws Exception {
PeerEncryptor result = new PeerEncryptor(peerKeyBytes);
memberToPeerEncryptor.put(member, result);
return result;
@@ -274,7 +297,7 @@ public class GMSEncrypt implements Cloneable {
this.peerPublicKey = getPublicKey(peerPublicKeyBytes);
}
- public byte[] encryptBytes(byte[] data) throws Exception {
+ public synchronized byte[] encryptBytes(byte[] data) throws Exception {
String algo = null;
if (this.peerSKAlgo != null) {
algo = this.peerSKAlgo;
@@ -295,7 +318,7 @@ public class GMSEncrypt implements Cloneable {
return encrypt;
}
- public byte[] decryptBytes(byte[] data) throws Exception {
+ public synchronized byte[] decryptBytes(byte[] data) throws Exception {
String algo = null;
if (this.peerSKAlgo != null) {
algo = this.peerSKAlgo;
@@ -352,7 +375,7 @@ public class GMSEncrypt implements Cloneable {
int blocksize = getBlockSize(dhSKAlgo);
if (keysize == -1 || blocksize == -1) {
- // TODO how should we do here
+ // TODO how should we do here, should we just throw runtime exception?
/* SecretKey sKey = ka.generateSecret(dhSKAlgo);
* encrypt = Cipher.getInstance(dhSKAlgo);
* encrypt.init(Cipher.ENCRYPT_MODE, sKey); */
@@ -403,7 +426,7 @@ public class GMSEncrypt implements Cloneable {
int blocksize = getBlockSize(dhSKAlgo);
if (keysize == -1 || blocksize == -1) {
- // TODO: how to do here
+ // TODO: how to do here, should we just throw runtime exception?
/* SecretKey sKey = ka.generateSecret(dhSKAlgo);
* decrypt = Cipher.getInstance(dhSKAlgo);
* decrypt.init(Cipher.DECRYPT_MODE, sKey); */
@@ -431,8 +454,6 @@ public class GMSEncrypt implements Cloneable {
SecretKey sKey = ka.generateSecret(dhSKAlgo);
return sKey.getEncoded();
} else {
- String algoStr = getDhAlgoStr(dhSKAlgo);
-
return ka.generateSecret();
}
}
@@ -453,16 +474,13 @@ public class GMSEncrypt implements Cloneable {
/***
* this will hold the common key for cluster
- * that will be created using publickey of all the members..
- *
*/
protected class ClusterEncryptor {
byte[] secretBytes;
+ //TODO: need to look this is thread safe
Cipher encrypt;
Cipher decrypt;
- int viewId;
- Set<InternalDistributedMember> mbrs;
-
+
public ClusterEncryptor(GMSEncrypt other) throws Exception {
GMSEncrypt mine = new GMSEncrypt(other.services);
this.secretBytes = GMSEncrypt.generateSecret(mine.dhSKAlgo, mine.dhPrivateKey, other.dhPublicKey);
@@ -472,7 +490,7 @@ public class GMSEncrypt implements Cloneable {
this.secretBytes = sb;
}
- public byte[] encryptBytes(byte[] data) throws Exception {
+ public synchronized byte[] encryptBytes(byte[] data) throws Exception {
String algo = dhSKAlgo;
return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo));
}
@@ -491,7 +509,7 @@ public class GMSEncrypt implements Cloneable {
return encrypt;
}
- public byte[] decryptBytes(byte[] data) throws Exception {
+ public synchronized byte[] decryptBytes(byte[] data) throws Exception {
String algo = dhSKAlgo;
Cipher c = getDecryptCipher(algo);
return GMSEncrypt.decryptBytes(data, c);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index cba5d5f..eedfc5f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -19,10 +19,13 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -33,12 +36,15 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -80,10 +86,13 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
import com.gemstone.gemfire.internal.ClassPathLoader;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.OSProcess;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.Version;
@@ -256,11 +265,11 @@ public class JGroupsMessenger implements Messenger {
if ( !dc.getSecurityClientDHAlgo().isEmpty() ) {
try {
this.encrypt = new GMSEncrypt(services);
+ logger.info("Initializing GMSEncrypt ");
} catch (Exception e) {
throw new GemFireConfigException("problem initializing encryption protocol", e);
}
}
-
}
@Override
@@ -389,12 +398,7 @@ public class JGroupsMessenger implements Messenger {
addressesWithIoExceptionsProcessed.clear();
if (encrypt != null) {
- try {
- encrypt.installView(v);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ encrypt.installView(v);
}
}
@@ -607,20 +611,7 @@ public class JGroupsMessenger implements Messenger {
public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
return send(msg, false);
}
-
- @Override
- public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
- if (this.encrypt != null) {
- try {
- this.encrypt.installView(alternateView);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- return send(msg, true);
- }
-
+
@Override
public Set<InternalDistributedMember> send(DistributionMessage msg) {
return send(msg, true);
@@ -670,7 +661,7 @@ public class JGroupsMessenger implements Messenger {
if (useMcast) {
long startSer = theStats.startMsgSerialization();
- Message jmsg = createJGMessage(msg, local, null, Version.CURRENT_ORDINAL);
+ Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
theStats.endMsgSerialization(startSer);
Exception problem = null;
@@ -712,7 +703,7 @@ public class JGroupsMessenger implements Messenger {
} // useMcast
else { // ! useMcast
int len = destinations.length;
- List<InternalDistributedMember> calculatedMembers; // explicit list of members
+ List<GMSMember> calculatedMembers; // explicit list of members
int calculatedLen; // == calculatedMembers.len
if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
// Grab a copy of the current membership
@@ -720,40 +711,51 @@ public class JGroupsMessenger implements Messenger {
// Construct the list
calculatedLen = v.size();
- calculatedMembers = new LinkedList<InternalDistributedMember>();
+ calculatedMembers = new LinkedList<GMSMember>();
for (int i = 0; i < calculatedLen; i ++) {
InternalDistributedMember m = (InternalDistributedMember)v.get(i);
- calculatedMembers.add(m);
+ calculatedMembers.add((GMSMember)m.getNetMember());
}
} // send to all
else { // send to explicit list
calculatedLen = len;
- calculatedMembers = new LinkedList<>();
+ calculatedMembers = new LinkedList<GMSMember>();
for (int i = 0; i < calculatedLen; i ++) {
- calculatedMembers.add(destinations[i]);
+ calculatedMembers.add((GMSMember)destinations[i].getNetMember());
}
} // send to explicit list
Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
long startSer = theStats.startMsgSerialization();
-
- boolean encode = (encrypt != null);
-
boolean firstMessage = true;
+ for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
+ GMSMember mbr = it.next();
+ short version = mbr.getVersionOrdinal();
+ if ( !messages.containsKey(version) ) {
+ Message jmsg = createJGMessage(msg, local, version);
+ messages.put(version, jmsg);
+ if (firstMessage) {
+ theStats.incSentBytes(jmsg.getLength());
+ firstMessage = false;
+ }
+ }
+ }
+ theStats.endMsgSerialization(startSer);
Collections.shuffle(calculatedMembers);
int i=0;
- for (InternalDistributedMember mbr: calculatedMembers) {
- short version = mbr.getNetMember().getVersionOrdinal();
+ for (GMSMember mbr: calculatedMembers) {
JGAddress to = new JGAddress(mbr);
- Message jmsg = createJGMessage(msg, local, mbr, version);
+ short version = mbr.getVersionOrdinal();
+ Message jmsg = (Message)messages.get(version);
Exception problem = null;
try {
+ Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
if (!reliably) {
jmsg.setFlag(Message.Flag.NO_RELIABILITY);
}
- jmsg.setDest(to);
- jmsg.setSrc(this.jgAddress);
+ tmp.setDest(to);
+ tmp.setSrc(this.jgAddress);
logger.trace("Unicasting to {}", to);
- myChannel.send(jmsg);
+ myChannel.send(tmp);
}
catch (Exception e) {
problem = e;
@@ -810,7 +812,7 @@ public class JGroupsMessenger implements Messenger {
* @param version the version of the recipient
* @return the new message
*/
- Message createJGMessage(DistributionMessage gfmsg, JGAddress src, InternalDistributedMember recipient, short version) {
+ Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) {
if(gfmsg instanceof DirectReplyMessage) {
((DirectReplyMessage) gfmsg).registerProcessor();
}
@@ -820,35 +822,17 @@ public class JGroupsMessenger implements Messenger {
setMessageFlags(gfmsg, msg);
try {
long start = services.getStatistics().startMsgSerialization();
- HeapDataOutputStream out_stream =
- new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+ byte[] messageBytes = null;
+ HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
Version.CURRENT.writeOrdinal(out_stream, true);
- DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
- boolean encode = encrypt != null && recipient != null;
- if (encode) {
- // Coordinator doesn't know our publicKey for a JoinRequest
- if (gfmsg.getDSFID() == JOIN_REQUEST || gfmsg.getDSFID() == JOIN_RESPONSE) {
- encode = false;
- }
- }
- if (encode) {
- logger.info("encoding {}", gfmsg);
- try {
- out_stream.writeBoolean(true); // TODO we should have flag bits
- HeapDataOutputStream out_stream2 =
- new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
- DataSerializer.writeObject(gfmsg, out_stream2);
- byte[] payload = out_stream2.toByteArray();
- payload = encrypt.encryptData(payload, recipient);
- DataSerializer.writeByteArray(payload, out_stream);
- } catch (Exception e) {
- throw new GemFireIOException("unable to send message", e);
- }
+ if(encrypt != null) {
+ out_stream.writeBoolean(true);
+ writeEncryptedMessage(gfmsg, version, out_stream);
} else {
- logger.info("not encoding {}", gfmsg);
out_stream.writeBoolean(false);
- DataSerializer.writeObject(gfmsg, out_stream);
+ serializeMessage(gfmsg, out_stream);
}
+
msg.setBuffer(out_stream.toByteArray());
services.getStatistics().endMsgSerialization(start);
}
@@ -862,9 +846,82 @@ public class JGroupsMessenger implements Messenger {
ioe.initCause(ex);
throw ioe;
}
+ } catch(Exception ex){
+ logger.warn("Error serializing message", ex);
+ GemFireIOException ioe = new
+ GemFireIOException("Error serializing message");
+ ioe.initCause(ex.getCause());
+ throw ioe;
}
return msg;
}
+
+ void writeEncryptedMessage(DistributionMessage gfmsg, short version, HeapDataOutputStream out) throws Exception {
+ InternalDataSerializer.writeDSFIDHeader(gfmsg.getDSFID(), out);
+ byte[] pk = null;
+ int requestId = 0;
+ InternalDistributedMember pkMbr = null;
+ switch (gfmsg.getDSFID()) {
+ case FIND_COORDINATOR_REQ:
+ case JOIN_REQUEST:
+ //need to append mine PK
+ pk = encrypt.getPublicKey(localAddress);
+
+ pkMbr = gfmsg.getRecipients()[0];
+ requestId = getRequestId(gfmsg, true);
+ break;
+ case FIND_COORDINATOR_RESP:
+ case JOIN_RESPONSE:
+ pkMbr = gfmsg.getRecipients()[0];
+ requestId = getRequestId(gfmsg, false);
+ default:
+ break;
+ }
+ logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {} for {} with requestid {}", gfmsg.getDSFID(), pkMbr, requestId);
+ out.writeInt(requestId);
+ if (pk != null) {
+ InternalDataSerializer.writeByteArray(pk, out);
+ }
+
+ HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+ byte[] messageBytes = serializeMessage(gfmsg, out_stream);
+
+ if (pkMbr != null) {
+ // using members private key
+ messageBytes = encrypt.encryptData(messageBytes, pkMbr);
+ } else {
+ // using cluster secret key
+ messageBytes = encrypt.encryptData(messageBytes);
+ }
+ InternalDataSerializer.writeByteArray(messageBytes, out);
+ }
+
+ int getRequestId(DistributionMessage gfmsg, boolean add) {
+ int requestId = 0;
+ if (gfmsg instanceof FindCoordinatorRequest) {
+ requestId = ((FindCoordinatorRequest) gfmsg).getRequestId();
+ } else if (gfmsg instanceof JoinRequestMessage) {
+ requestId = ((JoinRequestMessage) gfmsg).getRequestId();
+ } else if (gfmsg instanceof FindCoordinatorResponse) {
+ requestId = ((FindCoordinatorResponse) gfmsg).getRequestId();
+ } else if (gfmsg instanceof JoinResponseMessage) {
+ requestId = ((JoinResponseMessage) gfmsg).getRequestId();
+ }
+
+ if (add) {
+ addRequestId(requestId, gfmsg.getRecipients()[0]);
+ }
+
+ return requestId;
+ }
+
+ byte[] serializeMessage(DistributionMessage gfmsg, HeapDataOutputStream out_stream) throws IOException {
+
+ DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
+ DataSerializer.writeObject(gfmsg, out_stream);
+
+ return out_stream.toByteArray();
+ }
void setMessageFlags(DistributionMessage gfmsg, Message msg) {
// GemFire uses its own reply processors so there is no need
@@ -905,17 +962,14 @@ public class JGroupsMessenger implements Messenger {
// as STABLE_GOSSIP
logger.trace("message length is zero - ignoring");
return null;
- }
-
- InternalDistributedMember sender = null;
+ }
Exception problem = null;
byte[] buf = jgmsg.getRawBuffer();
try {
long start = services.getStatistics().startMsgDeserialization();
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
jgmsg.getOffset(), jgmsg.getLength()));
short ordinal = Version.readOrdinal(dis);
@@ -924,44 +978,33 @@ public class JGroupsMessenger implements Messenger {
dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
ordinal, true));
}
-
- GMSMember m = DataSerializer.readObject(dis);
-
- sender = getMemberFromView(m, ordinal);
-
- boolean encrypted = dis.readBoolean();
-
- if (encrypted && encrypt != null) {
- byte[] payload = DataSerializer.readByteArray(dis);
- try {
- payload = encrypt.decryptData(payload, sender);
- dis = new DataInputStream(new ByteArrayInputStream(payload));
- if (ordinal < Version.CURRENT_ORDINAL) {
- dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
- ordinal, true));
- }
- } catch (Exception e) {
- throw new GemFireIOException("unable to receive message", e);
- }
- }
-
- result = DataSerializer.readObject(dis);
-
- DistributionMessage dm = (DistributionMessage)result;
+
+ //read
+ boolean isEncrypted = dis.readBoolean();
+
+ if(isEncrypted && encrypt == null) {
+ throw new GemFireConfigException("Got remote message as encrypted");
+ }
- // JoinRequestMessages are sent with an ID that may have been
- // reused from a previous life by way of auto-reconnect,
- // so we don't want to find a canonical reference for the
- // request's sender ID
- if (dm.getDSFID() == JOIN_REQUEST) {
- sender = ((JoinRequestMessage)dm).getMemberID();
+ if(isEncrypted) {
+ result = readEncryptedMessage(dis, ordinal, encrypt);
+ } else {
+ GMSMember m = DataSerializer.readObject(dis);
+
+ result = DataSerializer.readObject(dis);
+
+ DistributionMessage dm = (DistributionMessage)result;
+
+ setSender(dm, m, ordinal);
}
- ((DistributionMessage)result).setSender(sender);
+
services.getStatistics().endMsgDeserialization(start);
}
catch (ClassNotFoundException | IOException | RuntimeException e) {
problem = e;
+ } catch(Exception e) {
+ problem = e;
}
if (problem != null) {
logger.error(LocalizedMessage.create(
@@ -972,37 +1015,113 @@ public class JGroupsMessenger implements Messenger {
return result;
}
-
- /** look for certain messages that may need to be altered before being sent */
- void filterOutgoingMessage(DistributionMessage m) {
- switch (m.getDSFID()) {
+ void setSender(DistributionMessage dm, GMSMember m, short ordinal) {
+ InternalDistributedMember sender = null;
+ // JoinRequestMessages are sent with an ID that may have been
+ // reused from a previous life by way of auto-reconnect,
+ // so we don't want to find a canonical reference for the
+ // request's sender ID
+ if (dm.getDSFID() == JOIN_REQUEST) {
+ sender = ((JoinRequestMessage)dm).getMemberID();
+ } else {
+ sender = getMemberFromView(m, ordinal);
+ }
+ dm.setSender(sender);
+ }
+
+ @SuppressWarnings("resource")
+ DistributionMessage readEncryptedMessage(DataInputStream dis, short ordinal, GMSEncrypt encryptLocal) throws Exception {
+ int dfsid = InternalDataSerializer.readDSFIDHeader(dis);
+ int requestId = dis.readInt();
+
+ try {
+ // TODO seems like we don't need this, just set bit that PK is appended
+
+ logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is " + requestId + " myid " + this.localAddress);
+ InternalDistributedMember pkMbr = null;
+ boolean readPK = false;
+ switch (dfsid) {
+ case FIND_COORDINATOR_REQ:
case JOIN_REQUEST:
- if (encrypt == null) {
- break;
- }
- JoinRequestMessage joinMsg = (JoinRequestMessage)m;
- joinMsg.setPublicKey(encrypt.getPublicKeyBytes());
+ readPK = true;
break;
-
+ case FIND_COORDINATOR_RESP:
case JOIN_RESPONSE:
- JoinResponseMessage jrsp = (JoinResponseMessage)m;
+ // this will have requestId to know the PK
+ pkMbr = getRequestedMember(requestId);
+ break;
+ }
+
+ byte[] data;
+
+ byte[] pk = null;
+
+ if (readPK) {
+ // need to read PK
+ pk = InternalDataSerializer.readByteArray(dis);
+ // encrypt.setPublicKey(publickey, mbr);
+ data = InternalDataSerializer.readByteArray(dis);
+ // using prefixed pk from sender
+ data = encryptLocal.decryptData(data, pk);
+ } else {
+ data = InternalDataSerializer.readByteArray(dis);
+ // from cluster key
+ if (pkMbr != null) {
+ // using member public key
+ data = encryptLocal.decryptData(data, pkMbr);
+ } else {
+ // from cluster key
+ data = encryptLocal.decryptData(data);
+ }
+ }
+
+ {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+
+ if (ordinal < Version.CURRENT_ORDINAL) {
+ in = new VersionedDataInputStream(in, Version.fromOrdinalNoThrow(ordinal, true));
+ }
+
+ GMSMember m = DataSerializer.readObject(in);
+
+ DistributionMessage result = (DistributionMessage) DataSerializer.readObject(in);
+
+ setSender(result, m, ordinal);
- if (jrsp.getRejectionMessage() == null
+ if (pk != null) {
+ encryptLocal.setPublicKey(pk, result.getSender());
+ }
+
+ return result;
+ }
+ } catch (Exception e) {
+ throw new Exception("Message id is " + dfsid, e);
+ }
+
+ }
+
+ /** look for certain messages that may need to be altered before being sent */
+ void filterOutgoingMessage(DistributionMessage m) {
+ switch (m.getDSFID()) {
+ case JOIN_RESPONSE:
+ JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+ if (jrsp.getRejectionMessage() == null
&& services.getConfig().getTransport().isMcastEnabled()) {
- // get the multicast message digest and pass it with the join response
- Digest digest = (Digest)this.myChannel.getProtocolStack()
+ // get the multicast message digest and pass it with the join response
+ Digest digest = (Digest)this.myChannel.getProtocolStack()
.getTopProtocol().down(Event.GET_DIGEST_EVT);
- HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
- try {
- digest.writeTo(hdos);
- } catch (Exception e) {
- logger.fatal("Unable to serialize JGroups messaging digest", e);
- }
- jrsp.setMessengerData(hdos.toByteArray());
+ HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
+ try {
+ digest.writeTo(hdos);
+ } catch (Exception e) {
+ logger.fatal("Unable to serialize JGroups messaging digest", e);
}
- break;
- default:
- break;
+ jrsp.setMessengerData(hdos.toByteArray());
+ }
+ break;
+ default:
+ break;
}
}
@@ -1190,7 +1309,7 @@ public class JGroupsMessenger implements Messenger {
if (clazz.isAssignableFrom(msgClazz)) {
h = handlers.get(clazz);
handlers.put(msg.getClass(), h);
- break;
+ break;
}
}
}
@@ -1201,6 +1320,72 @@ public class JGroupsMessenger implements Messenger {
}
}
+ @Override
+ public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
+ if (this.encrypt != null) {
+ this.encrypt.installView(alternateView);
+ }
+ return send(msg, true);
+ }
+
+ @Override
+ public byte[] getPublickey(InternalDistributedMember mbr) {
+ if (encrypt != null) {
+ return encrypt.getPublicKey(mbr);
+ }
+ return null;
+ }
+
+ @Override
+ public void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+ if (encrypt != null) {
+ logger.debug("Setting pK for member " + mbr);
+ encrypt.setPublicKey(publickey, mbr);
+ }
+ }
+
+ @Override
+ public void setClusterSecretKey(byte[] clusterSecretKey) {
+ if (encrypt != null) {
+ logger.debug("Setting cluster key");
+ encrypt.addClusterKey(clusterSecretKey);
+ }
+ }
+
+ @Override
+ public byte[] getClusterSecretKey() {
+ if (encrypt != null) {
+ return encrypt.getClusterSecretKey();
+ }
+ return null;
+ }
+
+ private Random randomId = new Random();
+ private HashMap<Integer, InternalDistributedMember> requestIdVsRecipients = new HashMap<>();
+ InternalDistributedMember getRequestedMember(int requestId) {
+ //TODO: what if we don't get response, need to remove this otherwise it will be leak
+ return requestIdVsRecipients.remove(requestId);
+ }
+
+ void addRequestId(int requestId, InternalDistributedMember mbr) {
+ requestIdVsRecipients.put(requestId, mbr);
+ }
+ @Override
+ public int getRequestId() {
+ return randomId.nextInt();
+ }
+
+ @Override
+ public void initClusterKey() {
+ if (encrypt != null) {
+ try {
+ logger.debug("Initializing cluster key");
+ encrypt.initClusterSecretKey();
+ } catch (Exception e) {
+ throw new RuntimeException("unable to create cluster key ", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
index bff592b..4574292 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
@@ -2751,6 +2751,24 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
}
}
+ public static final int readDSFIDHeader(final DataInput in)
+ throws IOException, ClassNotFoundException
+ {
+ checkIn(in);
+ byte header = in.readByte();
+ if (header == DS_FIXED_ID_BYTE) {
+ return in.readByte();
+ } else if (header == DS_FIXED_ID_SHORT) {
+ return in.readShort();
+ } else if (header == DS_NO_FIXED_ID) {
+ return Integer.MAX_VALUE;//is that correct??
+ } else if (header == DS_FIXED_ID_INT) {
+ return in.readInt();
+ } else {
+ throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
+ }
+ }
+
/**
* Reads an instance of <code>String</code> from a
* <code>DataInput</code> given the header byte already being read.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
index 011b8f5..582149c 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
@@ -256,6 +256,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
p.put(DistributionConfig.MCAST_TTL_NAME, mcastttl);
p.put(DistributionConfig.LOCATORS_NAME, "localhost[" + locatorPort +"]");
p.put(DistributionConfig.LOG_LEVEL_NAME, "info");
+ p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
return p;
}
@@ -288,6 +289,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
locatorProps.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastport);
locatorProps.setProperty(DistributionConfig.MCAST_TTL_NAME, mcastttl);
locatorProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, "info");
+ locatorProps.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
//locatorProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
try {
final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort, null, null,
[3/3] incubator-geode git commit: Merge branch 'feature/GEODE-1372'
of https://git-wip-us.apache.org/repos/asf/incubator-geode into GEODE-1372
Posted by hi...@apache.org.
Merge branch 'feature/GEODE-1372' of https://git-wip-us.apache.org/repos/asf/incubator-geode into GEODE-1372
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/667c4259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/667c4259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/667c4259
Branch: refs/heads/feature/GEODE-1372
Commit: 667c4259f4dc399766b37b03b999e70fe10761b8
Parents: b6a7344 d6fb783
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Wed Jun 1 15:28:18 2016 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Wed Jun 1 15:28:18 2016 -0700
----------------------------------------------------------------------
.../membership/gms/messenger/GMSEncryptJUnitTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/667c4259/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
----------------------------------------------------------------------