You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by qi...@apache.org on 2015/08/13 19:51:17 UTC
[1/3] incubator-geode git commit: GEODE-77: Implement Authenticator
interface in class GMSAuthenticator with unit tests.
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-77 52f8ce6d1 -> 0a70d5140
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index db27b95..c4d9c58 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -2129,7 +2129,12 @@ public class LocalizedStrings extends ParentLocalizedStrings {
public static final StringId Network_partition_detected = new StringId(6607, "Exiting due to possible network partition event due to loss of {0} cache processes: {1}");
-
+ // GMSAuthenticator
+ public static final StringId AUTH_PEER_AUTHENTICATION_FAILED_WITH_EXCEPTION = new StringId(6608, "Authentication failed for [{0}] using Authenticator [{1}]. {2}");
+ public static final StringId AUTH_PEER_AUTHENTICATION_FAILED = new StringId(6609, "Authentication failed. See coordinator [{0}] logs for details.");
+ public static final StringId AUTH_PEER_AUTHENTICATION_MISSING_CREDENTIALS = new StringId(6610, "Failed to find credentials from [{0}] using Authenticator [{1}]");
+ public static final StringId AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE = new StringId(6611, "AuthInitialize instance could not be obtained");
+
/** Testing strings, messageId 90000-99999 **/
/** These are simple messages for testing, translated with Babelfish. **/
@@ -2140,4 +2145,6 @@ public class LocalizedStrings extends ParentLocalizedStrings {
public static final StringId LISTENER_PREFIX = new StringId(90004, "Listener_");
public static final StringId DistributedRegion_INITIALIZING_REGION_COMPLETED_0 = new StringId(90005, "Initialization of region {0} completed");
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticatorJUnitTest.java
new file mode 100644
index 0000000..8775d0f
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticatorJUnitTest.java
@@ -0,0 +1,300 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.auth;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.security.AuthInitialize;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+import com.gemstone.gemfire.security.Authenticator;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import java.security.Principal;
+import java.util.Properties;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+
+@Category(UnitTest.class)
+public class GMSAuthenticatorJUnitTest {
+
+ static String prefix = "com.gemstone.gemfire.distributed.internal.membership.gms.auth.GMSAuthenticatorJUnitTest$";
+
+ Properties originalSystemProps = null;
+ Properties props = null;
+ Services services = null;
+ GMSAuthenticator authenticator = null;
+ DistributedMember member = null;
+
+ @Before
+ public void setUp() throws Exception {
+ originalSystemProps = System.getProperties();
+ props = new Properties();
+ authenticator = new GMSAuthenticator();
+ services = mock(Services.class);
+ InternalLogWriter securityLog = mock(InternalLogWriter.class);
+ when(services.getSecurityLogWriter()).thenReturn(securityLog);
+ authenticator.init(services);
+ member = mock(DistributedMember.class);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.setProperties(originalSystemProps);
+ }
+
+ @Test
+ public void testGetSecurityProps() throws Exception {
+ props.setProperty("gemfire.sys.security-peer-auth-init", "dummy1");
+ props.setProperty("gemfire.sys.security-peer-authenticator", "dummy2");
+ props.setProperty("security-auth-init", "dummy3");
+ System.setProperties(props);
+ Properties secProps = authenticator.getSecurityProps();
+ assertEquals("wrong size", 2, secProps.size());
+ assertEquals("wrong value", "dummy1", secProps.getProperty("security-peer-auth-init"));
+ assertEquals("wrong value", "dummy2", secProps.getProperty("security-peer-authenticator"));
+ }
+
+ @Test
+ public void testGetCredentialNormal() throws Exception {
+ props.setProperty("security-peer-auth-init", prefix + "TestAuthInit2.create");
+ TestAuthInit2 auth = new TestAuthInit2();
+ assertFalse(auth.isClosed());
+ TestAuthInit2.setAuthInitialize(auth);
+ Properties credential = authenticator.getCredentials(member, props);
+ assertTrue(props == credential);
+ assertTrue(auth.isClosed());
+ assertTrue(TestAuthInit2.getCreateCount() == 1);
+ }
+
+ @Test
+ public void testGetCredentialWithNoAuth() throws Exception {
+ Properties credential = authenticator.getCredentials(member, props);
+ assertNull(credential);
+ }
+
+ @Test
+ public void testGetCredentialWithEmptyAuth() throws Exception {
+ props.setProperty("security-peer-auth-init", "");
+ Properties credential = authenticator.getCredentials(member, props);
+ assertNull(credential);
+ }
+
+ @Test
+ public void testGetCredentialWithNotExistAuth() throws Exception {
+ props.setProperty("security-peer-auth-init", prefix + "NotExistAuth.create");
+ verifyNegativeGetCredential(props, "Failed to acquire AuthInitialize method");
+ }
+
+ @Test
+ public void testGetCredentialWithNullAuth() throws Exception {
+ props.setProperty("security-peer-auth-init", prefix + "TestAuthInit1.create");
+ verifyNegativeGetCredential(props, "AuthInitialize instance could not be obtained");
+ }
+
+ @Test
+ public void testGetCredentialWithInitError() throws Exception {
+ props.setProperty("security-peer-auth-init", prefix + "TestAuthInit3.create");
+ verifyNegativeGetCredential(props, "expected init error");
+ }
+
+ @Test
+ public void testGetCredentialWithError() throws Exception {
+ props.setProperty("security-peer-auth-init", prefix + "TestAuthInit4.create");
+ verifyNegativeGetCredential(props, "expected get credential error");
+ }
+
+ void verifyNegativeGetCredential(Properties props, String expectedError) throws Exception {
+ try {
+ authenticator.getCredentials(member, props);
+ fail("should catch: " + expectedError);
+ } catch (GemFireSecurityException e) {
+ assertTrue(e.getMessage().startsWith(expectedError));
+ }
+ }
+
+ @Test
+ public void testAuthenticatorNormal() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "TestAuthenticator4.create");
+ TestAuthenticator4 auth = new TestAuthenticator4();
+ assertFalse(auth.isClosed());
+ TestAuthenticator4.setAuthenticator(auth);
+ String result = authenticator.authenticate(member, props, props, member);
+ assertNull(result);
+ assertTrue(auth.isClosed());
+ assertTrue(TestAuthenticator4.getCreateCount() == 1);
+ }
+
+ @Test
+ public void testAuthenticatorWithNoAuth() throws Exception {
+ String result = authenticator.authenticate(member, props, props, member);
+ assertNull(result);
+ }
+
+ @Test
+ public void testAuthenticatorWithEmptyAuth() throws Exception {
+ props.setProperty("security-peer-authenticator", "");
+ String result = authenticator.authenticate(member, props, props, member);
+ assertNull(result);
+ }
+
+ @Test
+ public void testAuthenticatorWithNotExistAuth() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "NotExistAuth.create");
+ verifyNegativeAuthenticate(props, props, "Authentication failed. See coordinator");
+ }
+
+ @Test
+ public void testAuthenticatorWithNullAuth() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "TestAuthenticator1.create");
+ verifyNegativeAuthenticate(props, props, "Authentication failed. See coordinator");
+ }
+
+ @Test
+ public void testAuthenticatorWithNullCredential() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "TestAuthenticator1.create");
+ verifyNegativeAuthenticate(null, props, "Failed to find credentials from");
+ }
+
+ @Test
+ public void testAuthenticatorWithAuthInitFailure() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "TestAuthenticator2.create");
+ verifyNegativeAuthenticate(props, props, "Authentication failed. See coordinator");
+ }
+
+ @Test
+ public void testAuthenticatorWithAuthFailure() throws Exception {
+ props.setProperty("security-peer-authenticator", prefix + "TestAuthenticator3.create");
+ verifyNegativeAuthenticate(props, props, "Authentication failed. See coordinator");
+ }
+
+ void verifyNegativeAuthenticate(Object credential, Properties props, String expectedError) throws Exception {
+ String result = authenticator.authenticate(member, credential, props, member);
+ assertTrue(result, result.startsWith(expectedError));
+ }
+
+ // ----------------------------------------
+ // Test AuthInitialize
+ // ----------------------------------------
+
+ public static class TestAuthInit1 implements AuthInitialize {
+ public static AuthInitialize create() {
+ return null;
+ }
+ public void init(LogWriter systemLogger, LogWriter securityLogger) throws AuthenticationFailedException {
+ }
+ public Properties getCredentials(Properties props, DistributedMember server, boolean isPeer)
+ throws AuthenticationFailedException {
+ throw new AuthenticationFailedException("expected get credential error");
+ }
+ public void close() {
+ }
+ }
+
+ public static class TestAuthInit2 extends TestAuthInit1 {
+ static TestAuthInit2 instance = null;
+ static int createCount = 0;
+ public static void setAuthInitialize(TestAuthInit2 auth) {
+ instance = auth;
+ }
+ public static AuthInitialize create() {
+ createCount ++;
+ return instance;
+ }
+ public Properties getCredentials(Properties props, DistributedMember server, boolean isPeer)
+ throws AuthenticationFailedException {
+ return props;
+ }
+ boolean closed = false;
+ public void close() {
+ closed = true;
+ }
+ public boolean isClosed() {
+ return closed;
+ }
+ public static int getCreateCount() {
+ return createCount;
+ }
+ }
+
+ public static class TestAuthInit3 extends TestAuthInit1 {
+ public static AuthInitialize create() {
+ return new TestAuthInit3();
+ }
+ public void init(LogWriter systemLogger, LogWriter securityLogger) throws AuthenticationFailedException {
+ throw new AuthenticationFailedException("expected init error");
+ }
+ }
+
+ public static class TestAuthInit4 extends TestAuthInit1 {
+ public static AuthInitialize create() {
+ return new TestAuthInit4();
+ }
+ }
+
+ // ----------------------------------------
+ // Test Authenticator
+ // ----------------------------------------
+
+ public static class TestAuthenticator1 implements Authenticator {
+ public static Authenticator create() {
+ return null;
+ }
+ public void init(Properties securityProps, LogWriter systemLogger, LogWriter securityLogger) throws AuthenticationFailedException {
+ }
+ public Principal authenticate(Properties props, DistributedMember member) throws AuthenticationFailedException {
+ return null;
+ }
+ public void close() {
+ }
+ }
+
+ public static class TestAuthenticator2 extends TestAuthenticator1 {
+ public static Authenticator create() {
+ return new TestAuthenticator2();
+ }
+ public void init(Properties securityProps, LogWriter systemLogger, LogWriter securityLogger) throws AuthenticationFailedException {
+ throw new AuthenticationFailedException("expected init error");
+ }
+ }
+
+ public static class TestAuthenticator3 extends TestAuthenticator1 {
+ public static Authenticator create() {
+ return new TestAuthenticator3();
+ }
+ public Principal authenticate(Properties props, DistributedMember member) throws AuthenticationFailedException {
+ throw new AuthenticationFailedException("expected authenticate error");
+ }
+ }
+
+ public static class TestAuthenticator4 extends TestAuthenticator1 {
+ static Authenticator instance = null;
+ static int createCount = 0;
+ public static void setAuthenticator(Authenticator auth) {
+ instance = auth;
+ }
+ public static Authenticator create() {
+ createCount ++;
+ return instance;
+ }
+ public Principal authenticate(Properties props, DistributedMember member) throws AuthenticationFailedException {
+ return null;
+ }
+ boolean closed = false;
+ public void close() {
+ closed = true;
+ }
+ public boolean isClosed() {
+ return closed;
+ }
+ public static int getCreateCount() {
+ return createCount;
+ }
+ }
+}
[2/3] incubator-geode git commit: GEODE-77: Implement Authenticator
interface in class GMSAuthenticator with unit tests.
Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index cb4f9c9..76d9d71 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -1,1178 +1,1178 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
-
-import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.SystemConnectException;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.DistributionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
-import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
-import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
-import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.security.AuthenticationFailedException;
-
-/**
- * GMSJoinLeave handles membership communication with other processes in the
- * distributed system. It replaces the JGroups channel membership services
- * that Geode formerly used for this purpose.
- *
- */
-public class GMSJoinLeave implements JoinLeave, MessageHandler {
-
- /** number of times to try joining before giving up */
- private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
-
- /** amount of time to sleep before trying to join after a failed attempt */
- private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
-
- /** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
- private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
-
- /** stall time to wait for concurrent join/leave/remove requests to be received */
- private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 2000);
-
- /** time to wait for a leave request to be transmitted by jgroups */
- private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 2000);
-
- /** membership logger */
- private static final Logger logger = Services.getLogger();
-
-
- /** the view ID where I entered into membership */
- private int birthViewId;
-
- /** my address */
- private InternalDistributedMember localAddress;
-
- private Services services;
-
- /** have I connected to the distributed system? */
- private boolean isJoined;
-
- /** a lock governing GMS state */
- private ReadWriteLock stateLock = new ReentrantReadWriteLock();
-
- /** guarded by stateLock */
- private boolean isCoordinator;
-
- /** a synch object that guards view installation */
- private final Object viewInstallationLock = new Object();
-
- /** the currently installed view */
- private volatile NetView currentView;
-
- /** a new view being installed */
- private NetView preparedView;
-
- /** the last view that conflicted with view preparation */
- private NetView lastConflictingView;
-
- private List<InetSocketAddress> locators;
-
- /** a list of join/leave/crashes */
- private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
-
- /** collects the response to a join request */
- private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
-
- /** collects responses to new views */
- private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
-
- /** collects responses to view preparation messages */
- private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
-
- /** whether quorum checks can cause a forced-disconnect */
- private boolean quorumRequired = false;
-
- /** timeout in receiving view acknowledgement */
- private int viewAckTimeout;
-
- /** background thread that creates new membership views */
- private ViewCreator viewCreator;
-
- /** am I shutting down? */
- private volatile boolean isStopping;
-
-
- /**
- * attempt to join the distributed system
- * loop
- * send a join request to a locator & get a response
- *
- * If the response indicates there's no coordinator it
- * will contain a set of members that have recently contacted
- * it. The "oldest" member is selected as the coordinator
- * based on ID sort order.
- *
- * @return true if successful, false if not
- */
- public boolean join() {
-
- if (this.localAddress.getVmKind() == LOCATOR_DM_TYPE
- && Boolean.getBoolean("gemfire.first-member")) {
- becomeCoordinator();
- return true;
- }
-
- for (int tries=0; tries<JOIN_ATTEMPTS; tries++) {
- InternalDistributedMember coord = findCoordinator();
- logger.debug("found possible coordinator {}", coord);
- if (coord != null) {
- if (coord.equals(this.localAddress)) {
- if (tries > (JOIN_ATTEMPTS/2)) {
- becomeCoordinator();
- return true;
- }
- } else {
- if (attemptToJoin(coord)) {
- return true;
- }
- }
- }
- try {
- Thread.sleep(JOIN_RETRY_SLEEP);
- } catch (InterruptedException e) {
- return false;
- }
- } // for
- return this.isJoined;
- }
-
- /**
- * send a join request and wait for a reply. Process the reply.
- * This may throw a SystemConnectException or an AuthenticationFailedException
- * @param coord
- * @return true if the attempt succeeded, false if it timed out
- */
- private boolean attemptToJoin(InternalDistributedMember coord) {
- // send a join request to the coordinator and wait for a response
- logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
- JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress,
- services.getAuthenticator().getCredentials());
-
- services.getMessenger().send(req);
-
- JoinResponseMessage response = null;
- synchronized(joinResponse) {
- if (joinResponse[0] == null) {
- try {
- joinResponse.wait(services.getConfig().getJoinTimeout()/JOIN_ATTEMPTS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
- response = joinResponse[0];
- }
- if (response != null) {
- joinResponse[0] = null;
- String failReason = response.getRejectionMessage();
- if (failReason != null) {
- if (failReason.contains("Rejecting the attempt of a member using an older version")
- || failReason.contains("15806")) {
- throw new SystemConnectException(failReason);
- }
- throw new AuthenticationFailedException(failReason);
- }
- if (response.getCurrentView() != null) {
- this.birthViewId = response.getMemberID().getVmViewId();
- this.localAddress.setVmViewId(this.birthViewId);
- GMSMember me = (GMSMember)this.localAddress.getNetMember();
- GMSMember o = (GMSMember)response.getMemberID().getNetMember();
- me.setSplitBrainEnabled(o.isSplitBrainEnabled());
- me.setPreferredForCoordinator(o.preferredForCoordinator());
- installView(response.getCurrentView());
- return true;
- }
- }
- return false;
- }
-
-
- /**
- * process a join request from another member. If this is the coordinator
- * this method will enqueue the request for processing in another thread.
- * If this is not the coordinator but the coordinator is known, the message
- * is forwarded to the coordinator.
- * @param incomingRequest
- */
- private void processJoinRequest(JoinRequestMessage incomingRequest) {
- if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
- logger.warn("detected an attempt to start a peer using an older version of the product {}",
- incomingRequest.getMemberID());
- JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
- m.setRecipient(incomingRequest.getMemberID());
- services.getMessenger().send(m);
- return;
- }
- Object creds = incomingRequest.getCredentials();
- if (creds != null) {
- String rejection = null;
- try {
- rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
- } catch (Exception e) {
- rejection = e.getMessage();
- }
- if (rejection != null && rejection.length() > 0) {
- JoinResponseMessage m = new JoinResponseMessage(rejection);
- m.setRecipient(incomingRequest.getMemberID());
- services.getMessenger().send(m);
- }
- }
- recordViewRequest(incomingRequest);
- }
-
-
- /**
- * Process a Leave request from another member. This may cause this member
- * to become the new membership coordinator. If this is the coordinator
- * a new view will be triggered.
- *
- * @param incomingRequest
- */
- private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
- NetView v = currentView;
- if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
- +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
- }
- if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- logger.debug("JoinLeave is checking to see if I should become coordinator");
- NetView check = new NetView(v, v.getViewId()+1);
- check.remove(incomingRequest.getMemberID());
- if (check.getCoordinator().equals(localAddress)) {
- becomeCoordinator(incomingRequest.getMemberID());
- }
- }
- else {
- if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- recordViewRequest(incomingRequest);
- }
- }
- }
-
-
- /**
- * Process a Remove request from another member. This may cause this member
- * to become the new membership coordinator. If this is the coordinator
- * a new view will be triggered.
- *
- * @param incomingRequest
- */
- private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
- NetView v = currentView;
- if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processRemoveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
- +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
- }
- InternalDistributedMember mbr = incomingRequest.getMemberID();
-
- if (v != null && !v.contains(incomingRequest.getSender())) {
- logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
- return;
- }
-
- logger.info("Membership received a request to remove " + mbr
- + "; reason="+incomingRequest.getReason());
-
- if (mbr.equals(this.localAddress)) {
- // oops - I've been kicked out
- services.getManager().forceDisconnect(incomingRequest.getReason());
- return;
- }
-
- if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- logger.debug("JoinLeave is checking to see if I should become coordinator");
- NetView check = new NetView(v, v.getViewId()+1);
- check.remove(mbr);
- if (check.getCoordinator().equals(localAddress)) {
- becomeCoordinator(mbr);
- }
- }
- else {
- if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- recordViewRequest(incomingRequest);
- }
- }
- }
-
-
- private void recordViewRequest(DistributionMessage request) {
- logger.debug("JoinLeave is recording the request to be processed in the next membership view");
- synchronized(viewRequests) {
- viewRequests.add(request);
- viewRequests.notify();
- }
- }
-
- //for testing purposes, returns a copy of the view requests for verification
- List<DistributionMessage> getViewRequests() {
- synchronized(viewRequests) {
- return new LinkedList<DistributionMessage>(viewRequests);
- }
- }
-
- /**
- * Yippeee - I get to be the coordinator
- */
- private void becomeCoordinator() {
- becomeCoordinator(null);
- }
-
- /**
- * @param oldCoordinator may be null
- */
- private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
- stateLock.writeLock().lock();
- try {
- if (isCoordinator) {
- return;
- }
- logger.info("This member is becoming the membership coordinator with address {}", localAddress);
- isCoordinator = true;
- if (currentView == null) {
- // create the initial membership view
- NetView newView = new NetView(this.localAddress);
- this.localAddress.setVmViewId(0);
- installView(newView);
- isJoined = true;
- startCoordinatorServices();
- } else {
- // create and send out a new view
- NetView newView;
- synchronized(viewInstallationLock) {
- int viewNumber = currentView.getViewId() + 5;
- List<InternalDistributedMember> mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
- mbrs.add(localAddress);
- List<InternalDistributedMember> leaving = new ArrayList<InternalDistributedMember>();
- if (oldCoordinator != null) {
- leaving.add(oldCoordinator);
- }
- newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
- Collections.<InternalDistributedMember>emptyList());
- }
- sendView(newView, Collections.<InternalDistributedMember>emptyList());
- startCoordinatorServices();
- }
- } finally {
- stateLock.writeLock().unlock();
- }
- }
-
-
- private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
- for (InternalDistributedMember mbr: newMbrs) {
- JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
- services.getMessenger().send(response);
- }
- }
-
- private void sendRemoveMessages(List<InternalDistributedMember> newMbrs,
- List<String> reasons, NetView newView) {
- Iterator<String> reason = reasons.iterator();
- for (InternalDistributedMember mbr: newMbrs) {
- RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
- services.getMessenger().send(response);
- }
- }
-
-
- boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
- return sendView(view, newMembers, true, this.prepareProcessor);
- }
-
- void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
- sendView(view, newMembers, false, this.viewProcessor);
- }
-
-
- boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
- int id = view.getViewId();
- InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(), preparing);
- Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
- recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
- recips.remove(this.localAddress); // no need to send it to ourselves
- installView(view);
- recips.addAll(view.getCrashedMembers());
- if (recips.isEmpty()) {
- return true;
- }
- msg.setRecipients(recips);
- rp.initialize(id, recips);
-
- logger.info((preparing? "preparing" : "sending") + " new view " + view);
- services.getMessenger().send(msg);
-
- // only wait for responses during preparation
- if (preparing) {
- Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
-
- logger.info("View Creator is finished waiting for responses to view preparation");
-
- InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
- NetView conflictingView = rp.getConflictingView();
- if (conflictingView != null) {
- logger.warn("View Creator received a conflicting membership view from " + conflictingViewSender
- + " during preparation: " + conflictingView);
- return false;
- }
-
- if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
- logger.warn("these members failed to respond to the view change: " + failedToRespond);
- return false;
- }
- }
-
- return true;
- }
-
-
-
- private void processViewMessage(InstallViewMessage m) {
- NetView view = m.getView();
-
- if (currentView != null && view.getViewId() < currentView.getViewId()) {
- // ignore old views
- ackView(m);
- return;
- }
-
-
- if (m.isPreparing()) {
- if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
- }
- else {
- this.preparedView = view;
- ackView(m);
- }
- }
- else { // !preparing
- if (currentView != null && !view.contains(this.localAddress)) {
- if (quorumRequired) {
- services.getManager().forceDisconnect("This node is no longer in the membership view");
- }
- }
- else {
- ackView(m);
- installView(view);
- }
- }
- }
-
-
- private void ackView(InstallViewMessage m) {
- if (m.getView().contains(m.getView().getCreator())) {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
- }
- }
-
-
- private void processViewAckMessage(ViewAckMessage m) {
- if (m.isPrepareAck()) {
- this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
- } else {
- this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
- }
- }
-
- /**
- * This contacts the locators to find out who the current coordinator is.
- * All locators are contacted. If they don't agree then we choose the oldest
- * coordinator and return it.
- * @return
- */
- private InternalDistributedMember findCoordinator() {
- assert this.localAddress != null;
-
- FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
- Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
- long giveUpTime = System.currentTimeMillis() + (services.getConfig().getLocatorWaitTime() * 1000L);
- boolean anyResponses = false;
-
- do {
- for (InetSocketAddress addr: locators) {
- try {
- Object o = TcpClient.requestToServer(
- addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(),
- true);
- FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
- if (response != null && response.getCoordinator() != null) {
- anyResponses = false;
- coordinators.add(response.getCoordinator());
- if (response.isFromView()) {
- GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
- services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
- if (response.isUsePreferredCoordinators()
- && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
- mbr.setPreferredForCoordinator(false);
- }
- }
- }
- } catch (IOException | ClassNotFoundException problem) {
- }
- }
- if (coordinators.isEmpty()) {
- return null;
- }
- if (!anyResponses) {
- try { Thread.sleep(2000); } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- }
- } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
-
- Iterator<InternalDistributedMember> it = coordinators.iterator();
- if (coordinators.size() == 1) {
- return it.next();
- }
- InternalDistributedMember oldest = it.next();
- while (it.hasNext()) {
- InternalDistributedMember candidate = it.next();
- if (oldest.compareTo(candidate) > 0) {
- oldest = candidate;
- }
- }
- return oldest;
- }
-
-
- /**
- * receives a JoinResponse holding a membership view or rejection message
- * @param rsp
- */
- private void processJoinResponse(JoinResponseMessage rsp) {
- synchronized(joinResponse) {
- joinResponse[0] = rsp;
- joinResponse.notify();
- }
- }
-
- @Override
- public NetView getView() {
- return currentView;
- }
-
- @Override
- public InternalDistributedMember getMemberID() {
- return this.localAddress;
- }
-
- public void installView(NetView newView) {
- synchronized(viewInstallationLock) {
- if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
- // old view - ignore it
- return;
- }
-
- if (checkForPartition(newView)) {
- if (quorumRequired) {
- List<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
- services.getManager().forceDisconnect(
- LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
- }
- return;
- }
-
- currentView = newView;
- preparedView = null;
- lastConflictingView = null;
- services.installView(newView);
-
- if (!newView.getCreator().equals(this.localAddress)) {
- if (newView.shouldBeCoordinator(this.localAddress)) {
- becomeCoordinator();
- } else if (this.isCoordinator) {
- // stop being coordinator
- stateLock.writeLock().lock();
- try {
- stopCoordinatorServices();
- this.isCoordinator = false;
- } finally {
- stateLock.writeLock().unlock();
- }
- }
- }
- if (!this.isCoordinator) {
- // get rid of outdated requests. It's possible some requests are
- // newer than the view just processed - the senders will have to
- // resend these
- synchronized(viewRequests) {
- for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
- DistributionMessage m = it.next();
- if (m instanceof JoinRequestMessage) {
- it.remove();
- } else if (m instanceof LeaveRequestMessage) {
- if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) {
- it.remove();
- }
- } else if (m instanceof RemoveMemberMessage) {
- if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) {
- it.remove();
- }
- }
- }
- }
- }
- }
- }
-
-
- /**
- * check to see if the new view shows a drop of 51% or more
- */
- private boolean checkForPartition(NetView newView) {
- if (currentView == null) {
- return false;
- }
- int oldWeight = currentView.memberWeight();
- int failedWeight = newView.getCrashedMemberWeight(currentView);
- if (failedWeight > 0) {
- if (logger.isInfoEnabled()) {
- newView.logCrashedMemberWeights(currentView, logger);
- }
- int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
- if (failedWeight > failurePoint) {
- services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
- return true;
- }
- }
- return false;
- }
-
-
- /** invoke this under the viewInstallationLock */
- private void startCoordinatorServices() {
- if (viewCreator == null || viewCreator.isShutdown()) {
- viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
- viewCreator.setDaemon(true);
- viewCreator.start();
- }
- }
-
- private void stopCoordinatorServices() {
- if (viewCreator != null && !viewCreator.isShutdown()) {
- viewCreator.shutdown();
- }
- }
-
- public static void loadEmergencyClasses() {
- }
-
- @Override
- public void emergencyClose() {
- isStopping = true;
- isJoined = false;
- stopCoordinatorServices();
- isCoordinator = false;
- currentView = null;
- }
-
- public void beSick() {
- }
-
- public void playDead() {
- }
-
- public void beHealthy() {
- }
-
-
-
- @Override
- public void start() {
- }
-
-
-
- @Override
- public void started() {
- this.localAddress = services.getMessenger().getMemberID();
- }
-
-
-
- @Override
- public void stop() {
- logger.debug("JoinLeave stopping");
- leave();
- }
-
-
-
- @Override
- public void stopped() {
- // TODO Auto-generated method stub
-
- }
-
-
-
- @Override
- public void leave() {
- synchronized(viewInstallationLock) {
- NetView view = currentView;
- isStopping = true;
- if (view != null) {
- if (view.size() > 1) {
- if (this.isCoordinator) {
- logger.debug("JoinLeave stopping coordination services");
- stopCoordinatorServices();
- NetView newView = new NetView(view, view.getViewId()+1);
- newView.remove(localAddress);
- InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials());
- m.setRecipients(newView.getMembers());
- services.getMessenger().send(m);
- try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
- catch (InterruptedException e) { Thread.currentThread().interrupt(); }
- }
- else {
- logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator());
- LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress);
- services.getMessenger().send(m);
- }
- } // view.size
- }// view != null
- }
-
- }
-
-
-
- @Override
- public void remove(InternalDistributedMember m, String reason) {
- NetView v = this.currentView;
- if (v != null) {
- RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
- reason);
- services.getMessenger().send(msg);
- }
- }
-
- @Override
- public void disableDisconnectOnQuorumLossForTesting() {
- this.quorumRequired = false;
- }
-
- @Override
- public void init(Services s) {
- this.services = s;
- services.getMessenger().addHandler(JoinRequestMessage.class, this);
- services.getMessenger().addHandler(JoinResponseMessage.class, this);
- services.getMessenger().addHandler(InstallViewMessage.class, this);
- services.getMessenger().addHandler(ViewAckMessage.class, this);
- services.getMessenger().addHandler(LeaveRequestMessage.class, this);
- services.getMessenger().addHandler(RemoveMemberMessage.class, this);
- services.getMessenger().addHandler(JoinRequestMessage.class, this);
- services.getMessenger().addHandler(JoinResponseMessage.class, this);
-
- DistributionConfig dc = services.getConfig().getDistributionConfig();
- int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000;
- if (ackCollectionTimeout < 1500) {
- ackCollectionTimeout = 1500;
- } else if (ackCollectionTimeout > 12437) {
- ackCollectionTimeout = 12437;
- }
- ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
- this.viewAckTimeout = ackCollectionTimeout;
-
- this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
-
- DistributionConfig dconfig = services.getConfig().getDistributionConfig();
- String bindAddr = dconfig.getBindAddress();
- locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
- }
-
- @Override
- public void processMessage(DistributionMessage m) {
- if (isStopping) {
- return;
- }
- logger.debug("JoinLeave processing {}", m);
- switch (m.getDSFID()) {
- case JOIN_REQUEST:
- processJoinRequest((JoinRequestMessage)m);
- break;
- case JOIN_RESPONSE:
- processJoinResponse((JoinResponseMessage)m);
- break;
- case INSTALL_VIEW_MESSAGE:
- processViewMessage((InstallViewMessage)m);
- break;
- case VIEW_ACK_MESSAGE:
- processViewAckMessage((ViewAckMessage)m);
- break;
- case LEAVE_REQUEST_MESSAGE:
- processLeaveRequest((LeaveRequestMessage)m);
- break;
- case REMOVE_MEMBER_MESSAGE:
- processRemoveRequest((RemoveMemberMessage)m);
- break;
- default:
- throw new IllegalArgumentException("unknown message type: " + m);
- }
- }
-
-
-
-
- class ViewReplyProcessor {
- volatile int viewId = -1;
- volatile Set<InternalDistributedMember> recipients;
- volatile NetView conflictingView;
- volatile InternalDistributedMember conflictingViewSender;
- volatile boolean waiting;
- final boolean isPrepareViewProcessor;
-
- ViewReplyProcessor(boolean forPreparation) {
- this.isPrepareViewProcessor = forPreparation;
- }
-
- void initialize(int viewId, Set<InternalDistributedMember> recips) {
- this.waiting = true;
- this.viewId = viewId;
- this.recipients = recips;
- this.conflictingView = null;
- }
-
- void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
- if (!this.waiting) {
- return;
- }
-
- if (viewId == this.viewId) {
- if (conflictingView != null) {
- this.conflictingViewSender = sender;
- this.conflictingView = conflictingView;
- }
-
- Set<InternalDistributedMember> waitingFor = this.recipients;
- synchronized(waitingFor) {
- waitingFor.remove(sender);
- if (waitingFor.isEmpty()) {
- logger.debug("All view responses received - notifying waiting thread");
- waitingFor.notify();
- }
- }
-
- }
- }
-
- Set<InternalDistributedMember> waitForResponses() {
- Set<InternalDistributedMember> result = this.recipients;
- long endOfWait = System.currentTimeMillis() + viewAckTimeout;
- try {
- while (System.currentTimeMillis() < endOfWait
- && (services.getCancelCriterion().cancelInProgress() == null)) {
- try {
- synchronized(result) {
- result.wait(1000);
- }
- } catch (InterruptedException e) {
- logger.debug("Interrupted while waiting for view resonses");
- Thread.currentThread().interrupt();
- return result;
- }
- }
- } finally {
- this.waiting = false;
- }
- return result;
- }
-
- NetView getConflictingView() {
- return this.conflictingView;
- }
-
- InternalDistributedMember getConflictingViewSender() {
- return this.conflictingViewSender;
- }
-
- Set<InternalDistributedMember> getUnresponsiveMembers() {
- return this.recipients;
- }
- }
-
-
-
-
-
- class ViewCreator extends Thread {
- boolean shutdown = false;
-
- ViewCreator(String name, ThreadGroup tg) {
- super(tg, name);
- }
-
- void shutdown() {
- shutdown = true;
- synchronized(viewRequests) {
- viewRequests.notify();
- interrupt();
- }
- }
-
- boolean isShutdown() {
- return shutdown;
- }
-
- @Override
- public void run() {
- List<DistributionMessage> requests = null;
- logger.info("View Creator thread is starting");
- long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
- try {
- for (;;) {
- synchronized(viewRequests) {
- if (shutdown) {
- return;
- }
- if (viewRequests.isEmpty()) {
- try {
- logger.debug("View Creator is waiting for requests");
- viewRequests.wait();
- } catch (InterruptedException e) {
- return;
- }
- } else {
- if (System.currentTimeMillis() < okayToCreateView) {
- // sleep to let more requests arrive
- try {
- sleep(100);
- continue;
- } catch (InterruptedException e) {
- return;
- }
- } else {
- // time to create a new membership view
- if (requests == null) {
- requests = new ArrayList<DistributionMessage>(viewRequests);
- } else {
- requests.addAll(viewRequests);
- }
- viewRequests.clear();
- okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
- }
- }
- } // synchronized
- if (requests != null && !requests.isEmpty()) {
- logger.debug("View Creator is processing {} requests for the next membership view", requests.size());
- /*boolean success = */createAndSendView(requests);
- requests = null;
- }
- }
- } finally {
- shutdown = true;
- }
- }
-
- /**
- * Create a new membership view and send it to members (including crashed members).
- * Returns false if the view cannot be prepared successfully, true otherwise
- */
- boolean createAndSendView(List<DistributionMessage> requests) {
- List<InternalDistributedMember> joinReqs = new ArrayList<InternalDistributedMember>();
- List<InternalDistributedMember> leaveReqs = new ArrayList<InternalDistributedMember>();
- List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>();
- List<String> removalReasons = new ArrayList<String>();
-
- for (DistributionMessage msg: requests) {
- logger.debug("processing request {}", msg);
- if (msg instanceof JoinRequestMessage) {
- InternalDistributedMember mbr = ((JoinRequestMessage)msg).getMemberID();
- joinReqs.add(mbr);
- }
- else if (msg instanceof LeaveRequestMessage) {
- leaveReqs.add(((LeaveRequestMessage) msg).getMemberID());
- }
- else if (msg instanceof RemoveMemberMessage) {
- removalReqs.add(((RemoveMemberMessage) msg).getMemberID());
- removalReasons.add(((RemoveMemberMessage) msg).getReason());
- }
- else {
- // TODO: handle removals
- logger.warn("Unknown membership request encountered: {}", msg);
- }
- }
-
- NetView newView;
- synchronized(viewInstallationLock) {
- int viewNumber = 0;
- List<InternalDistributedMember> mbrs;
- if (currentView == null) {
- mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
- } else {
- viewNumber = currentView.getViewId()+1;
- mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
- }
- mbrs.addAll(joinReqs);
- mbrs.removeAll(leaveReqs);
- mbrs.removeAll(removalReqs);
- newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
- removalReqs);
- }
-
- for (InternalDistributedMember mbr: joinReqs) {
- mbr.setVmViewId(newView.getViewId());
- }
- // send removal messages before installing the view so we stop
- // getting messages from members that have been kicked out
- sendRemoveMessages(removalReqs, removalReasons, newView);
-
- // we want to always check for quorum loss but don't act on it
- // unless network-partition-detection is enabled
- if ( !(checkForPartition(newView) && quorumRequired) ) {
- sendJoinResponses(joinReqs, newView);
- }
-
- if (quorumRequired) {
- boolean prepared = false;
- do {
- if (this.shutdown || Thread.currentThread().isInterrupted()) {
- return false;
- }
- prepared = prepareView(newView, joinReqs);
- if (!prepared && quorumRequired) {
- Set<InternalDistributedMember> unresponsive = prepareProcessor.getUnresponsiveMembers();
- try {
- removeHealthyMembers(unresponsive);
- } catch (InterruptedException e) {
- // abort the view if interrupted
- shutdown = true;
- return false;
- }
-
- if (!unresponsive.isEmpty()) {
- List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
- failures.addAll(unresponsive);
-
- NetView conflictingView = prepareProcessor.getConflictingView();
- if (conflictingView != null
- && !conflictingView.getCreator().equals(localAddress)
- && conflictingView.getViewId() > newView.getViewId()
- && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
- lastConflictingView = conflictingView;
- failures.addAll(conflictingView.getCrashedMembers());
- }
-
- failures.removeAll(removalReqs);
- if (failures.size() > 0) {
- // abort the current view and try again
- removalReqs.addAll(failures);
- newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
- removalReqs);
- }
- }
- }
- } while (!prepared);
- } // quorumRequired
-
- lastConflictingView = null;
-
- sendView(newView, joinReqs);
- return true;
- }
-
- /**
- * performs health checks on the collection of members, removing any that
- * are found to be healthy
- * @param mbrs
- */
- private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
- List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
-
- for (InternalDistributedMember mbr: mbrs) {
- final InternalDistributedMember fmbr = mbr;
- checkers.add(new Callable<InternalDistributedMember>() {
- @Override
- public InternalDistributedMember call() throws Exception {
- // return the member id if it fails health checks
- logger.info("checking state of member " + fmbr);
- if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
- logger.info("member " + fmbr + " passed availability check");
- return fmbr;
- }
- logger.info("member " + fmbr + " failed availability check");
- return null;
- }
- });
- }
-
- ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
- AtomicInteger i = new AtomicInteger();
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(Services.getThreadGroup(), r,
- "Member verification thread " + i.incrementAndGet());
- }
- });
-
- try {
- List<Future<InternalDistributedMember>> futures;
- futures = svc.invokeAll(checkers);
-
- for (Future<InternalDistributedMember> future: futures) {
- try {
- InternalDistributedMember mbr = future.get(viewAckTimeout, TimeUnit.MILLISECONDS);
- if (mbr != null) {
- logger.debug("disregarding lack of acknowledgement from {}", mbr);
- mbrs.remove(mbr);
- }
- } catch (java.util.concurrent.TimeoutException e) {
- // TODO should the member be removed if we can't verify it in time?
- } catch (ExecutionException e) {
- logger.info("unexpected exception caught during member verification", e);
- }
- }
- } finally {
- svc.shutdownNow();
- }
- }
- }
-
-}
+package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemConnectException;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+
+/**
+ * GMSJoinLeave handles membership communication with other processes in the
+ * distributed system. It replaces the JGroups channel membership services
+ * that Geode formerly used for this purpose.
+ *
+ */
+public class GMSJoinLeave implements JoinLeave, MessageHandler {
+
+ /** number of times to try joining before giving up */
+ private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
+
+ /** amount of time to sleep before trying to join after a failed attempt */
+ private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
+
+ /** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
+ private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
+
+ /** stall time to wait for concurrent join/leave/remove requests to be received */
+ private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 2000);
+
+ /** time to wait for a leave request to be transmitted by jgroups */
+ private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 2000);
+
+ /** membership logger */
+ private static final Logger logger = Services.getLogger();
+
+
+ /** the view ID where I entered into membership */
+ private int birthViewId;
+
+ /** my address */
+ private InternalDistributedMember localAddress;
+
+ private Services services;
+
+ /** have I connected to the distributed system? */
+ private boolean isJoined;
+
+ /** a lock governing GMS state */
+ private ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
+ /** guarded by stateLock */
+ private boolean isCoordinator;
+
+ /** a synch object that guards view installation */
+ private final Object viewInstallationLock = new Object();
+
+ /** the currently installed view */
+ private volatile NetView currentView;
+
+ /** a new view being installed */
+ private NetView preparedView;
+
+ /** the last view that conflicted with view preparation */
+ private NetView lastConflictingView;
+
+ private List<InetSocketAddress> locators;
+
+ /** a list of join/leave/crashes */
+ private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
+
+ /** collects the response to a join request */
+ private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
+
+ /** collects responses to new views */
+ private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
+
+ /** collects responses to view preparation messages */
+ private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
+
+ /** whether quorum checks can cause a forced-disconnect */
+ private boolean quorumRequired = false;
+
+ /** timeout in receiving view acknowledgement */
+ private int viewAckTimeout;
+
+ /** background thread that creates new membership views */
+ private ViewCreator viewCreator;
+
+ /** am I shutting down? */
+ private volatile boolean isStopping;
+
+
+ /**
+ * attempt to join the distributed system
+ * loop
+ * send a join request to a locator & get a response
+ *
+ * If the response indicates there's no coordinator it
+ * will contain a set of members that have recently contacted
+ * it. The "oldest" member is selected as the coordinator
+ * based on ID sort order.
+ *
+ * @return true if successful, false if not
+ */
+ public boolean join() {
+
+ if (this.localAddress.getVmKind() == LOCATOR_DM_TYPE
+ && Boolean.getBoolean("gemfire.first-member")) {
+ becomeCoordinator();
+ return true;
+ }
+
+ for (int tries=0; tries<JOIN_ATTEMPTS; tries++) {
+ InternalDistributedMember coord = findCoordinator();
+ logger.debug("found possible coordinator {}", coord);
+ if (coord != null) {
+ if (coord.equals(this.localAddress)) {
+ if (tries > (JOIN_ATTEMPTS/2)) {
+ becomeCoordinator();
+ return true;
+ }
+ } else {
+ if (attemptToJoin(coord)) {
+ return true;
+ }
+ }
+ }
+ try {
+ Thread.sleep(JOIN_RETRY_SLEEP);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ } // for
+ return this.isJoined;
+ }
+
+ /**
+ * send a join request and wait for a reply. Process the reply.
+ * This may throw a SystemConnectException or an AuthenticationFailedException
+ * @param coord
+ * @return true if the attempt succeeded, false if it timed out
+ */
+ private boolean attemptToJoin(InternalDistributedMember coord) {
+ // send a join request to the coordinator and wait for a response
+ logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
+ JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress,
+ services.getAuthenticator().getCredentials(coord));
+
+ services.getMessenger().send(req);
+
+ JoinResponseMessage response = null;
+ synchronized(joinResponse) {
+ if (joinResponse[0] == null) {
+ try {
+ joinResponse.wait(services.getConfig().getJoinTimeout()/JOIN_ATTEMPTS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ response = joinResponse[0];
+ }
+ if (response != null) {
+ joinResponse[0] = null;
+ String failReason = response.getRejectionMessage();
+ if (failReason != null) {
+ if (failReason.contains("Rejecting the attempt of a member using an older version")
+ || failReason.contains("15806")) {
+ throw new SystemConnectException(failReason);
+ }
+ throw new AuthenticationFailedException(failReason);
+ }
+ if (response.getCurrentView() != null) {
+ this.birthViewId = response.getMemberID().getVmViewId();
+ this.localAddress.setVmViewId(this.birthViewId);
+ GMSMember me = (GMSMember)this.localAddress.getNetMember();
+ GMSMember o = (GMSMember)response.getMemberID().getNetMember();
+ me.setSplitBrainEnabled(o.isSplitBrainEnabled());
+ me.setPreferredForCoordinator(o.preferredForCoordinator());
+ installView(response.getCurrentView());
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ /**
+ * process a join request from another member. If this is the coordinator
+ * this method will enqueue the request for processing in another thread.
+ * If this is not the coordinator but the coordinator is known, the message
+ * is forwarded to the coordinator.
+ * @param incomingRequest
+ */
+ private void processJoinRequest(JoinRequestMessage incomingRequest) {
+ if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
+ logger.warn("detected an attempt to start a peer using an older version of the product {}",
+ incomingRequest.getMemberID());
+ JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+ m.setRecipient(incomingRequest.getMemberID());
+ services.getMessenger().send(m);
+ return;
+ }
+ Object creds = incomingRequest.getCredentials();
+ if (creds != null) {
+ String rejection = null;
+ try {
+ rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
+ } catch (Exception e) {
+ rejection = e.getMessage();
+ }
+ if (rejection != null && rejection.length() > 0) {
+ JoinResponseMessage m = new JoinResponseMessage(rejection);
+ m.setRecipient(incomingRequest.getMemberID());
+ services.getMessenger().send(m);
+ }
+ }
+ recordViewRequest(incomingRequest);
+ }
+
+
+ /**
+ * Process a Leave request from another member. This may cause this member
+ * to become the new membership coordinator. If this is the coordinator
+ * a new view will be triggered.
+ *
+ * @param incomingRequest
+ */
+ private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
+ NetView v = currentView;
+ if (logger.isDebugEnabled()) {
+ logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+ +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
+ }
+ if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+ logger.debug("JoinLeave is checking to see if I should become coordinator");
+ NetView check = new NetView(v, v.getViewId()+1);
+ check.remove(incomingRequest.getMemberID());
+ if (check.getCoordinator().equals(localAddress)) {
+ becomeCoordinator(incomingRequest.getMemberID());
+ }
+ }
+ else {
+ if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+ recordViewRequest(incomingRequest);
+ }
+ }
+ }
+
+
+ /**
+ * Process a Remove request from another member. This may cause this member
+ * to become the new membership coordinator. If this is the coordinator
+ * a new view will be triggered.
+ *
+ * @param incomingRequest
+ */
+ private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
+ NetView v = currentView;
+ if (logger.isDebugEnabled()) {
+ logger.debug("JoinLeave.processRemoveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+ +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
+ }
+ InternalDistributedMember mbr = incomingRequest.getMemberID();
+
+ if (v != null && !v.contains(incomingRequest.getSender())) {
+ logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
+ return;
+ }
+
+ logger.info("Membership received a request to remove " + mbr
+ + "; reason="+incomingRequest.getReason());
+
+ if (mbr.equals(this.localAddress)) {
+ // oops - I've been kicked out
+ services.getManager().forceDisconnect(incomingRequest.getReason());
+ return;
+ }
+
+ if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+ logger.debug("JoinLeave is checking to see if I should become coordinator");
+ NetView check = new NetView(v, v.getViewId()+1);
+ check.remove(mbr);
+ if (check.getCoordinator().equals(localAddress)) {
+ becomeCoordinator(mbr);
+ }
+ }
+ else {
+ if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+ recordViewRequest(incomingRequest);
+ }
+ }
+ }
+
+
+ private void recordViewRequest(DistributionMessage request) {
+ logger.debug("JoinLeave is recording the request to be processed in the next membership view");
+ synchronized(viewRequests) {
+ viewRequests.add(request);
+ viewRequests.notify();
+ }
+ }
+
+ //for testing purposes, returns a copy of the view requests for verification
+ List<DistributionMessage> getViewRequests() {
+ synchronized(viewRequests) {
+ return new LinkedList<DistributionMessage>(viewRequests);
+ }
+ }
+
+ /**
+ * Yippeee - I get to be the coordinator
+ */
+ private void becomeCoordinator() {
+ becomeCoordinator(null);
+ }
+
+ /**
+ * @param oldCoordinator may be null
+ */
+ private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
+ stateLock.writeLock().lock();
+ try {
+ if (isCoordinator) {
+ return;
+ }
+ logger.info("This member is becoming the membership coordinator with address {}", localAddress);
+ isCoordinator = true;
+ if (currentView == null) {
+ // create the initial membership view
+ NetView newView = new NetView(this.localAddress);
+ this.localAddress.setVmViewId(0);
+ installView(newView);
+ isJoined = true;
+ startCoordinatorServices();
+ } else {
+ // create and send out a new view
+ NetView newView;
+ synchronized(viewInstallationLock) {
+ int viewNumber = currentView.getViewId() + 5;
+ List<InternalDistributedMember> mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
+ mbrs.add(localAddress);
+ List<InternalDistributedMember> leaving = new ArrayList<InternalDistributedMember>();
+ if (oldCoordinator != null) {
+ leaving.add(oldCoordinator);
+ }
+ newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
+ Collections.<InternalDistributedMember>emptyList());
+ }
+ sendView(newView, Collections.<InternalDistributedMember>emptyList());
+ startCoordinatorServices();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+
+ private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
+ for (InternalDistributedMember mbr: newMbrs) {
+ JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+ services.getMessenger().send(response);
+ }
+ }
+
+ private void sendRemoveMessages(List<InternalDistributedMember> newMbrs,
+ List<String> reasons, NetView newView) {
+ Iterator<String> reason = reasons.iterator();
+ for (InternalDistributedMember mbr: newMbrs) {
+ RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
+ services.getMessenger().send(response);
+ }
+ }
+
+
+ boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
+ return sendView(view, newMembers, true, this.prepareProcessor);
+ }
+
+ void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
+ sendView(view, newMembers, false, this.viewProcessor);
+ }
+
+
+ boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
+ int id = view.getViewId();
+ InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
+ Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
+ recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
+ recips.remove(this.localAddress); // no need to send it to ourselves
+ installView(view);
+ recips.addAll(view.getCrashedMembers());
+ if (recips.isEmpty()) {
+ return true;
+ }
+ msg.setRecipients(recips);
+ rp.initialize(id, recips);
+
+ logger.info((preparing? "preparing" : "sending") + " new view " + view);
+ services.getMessenger().send(msg);
+
+ // only wait for responses during preparation
+ if (preparing) {
+ Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
+
+ logger.info("View Creator is finished waiting for responses to view preparation");
+
+ InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
+ NetView conflictingView = rp.getConflictingView();
+ if (conflictingView != null) {
+ logger.warn("View Creator received a conflicting membership view from " + conflictingViewSender
+ + " during preparation: " + conflictingView);
+ return false;
+ }
+
+ if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
+ logger.warn("these members failed to respond to the view change: " + failedToRespond);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+
+ private void processViewMessage(InstallViewMessage m) {
+ NetView view = m.getView();
+
+ if (currentView != null && view.getViewId() < currentView.getViewId()) {
+ // ignore old views
+ ackView(m);
+ return;
+ }
+
+
+ if (m.isPreparing()) {
+ if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
+ }
+ else {
+ this.preparedView = view;
+ ackView(m);
+ }
+ }
+ else { // !preparing
+ if (currentView != null && !view.contains(this.localAddress)) {
+ if (quorumRequired) {
+ services.getManager().forceDisconnect("This node is no longer in the membership view");
+ }
+ }
+ else {
+ ackView(m);
+ installView(view);
+ }
+ }
+ }
+
+
+ private void ackView(InstallViewMessage m) {
+ if (m.getView().contains(m.getView().getCreator())) {
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
+ }
+ }
+
+
+ private void processViewAckMessage(ViewAckMessage m) {
+ if (m.isPrepareAck()) {
+ this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ } else {
+ this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ }
+ }
+
+ /**
+ * This contacts the locators to find out who the current coordinator is.
+ * All locators are contacted. If they don't agree then we choose the oldest
+ * coordinator and return it.
+ * @return
+ */
+ private InternalDistributedMember findCoordinator() {
+ assert this.localAddress != null;
+
+ FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
+ Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
+ long giveUpTime = System.currentTimeMillis() + (services.getConfig().getLocatorWaitTime() * 1000L);
+ boolean anyResponses = false;
+
+ do {
+ for (InetSocketAddress addr: locators) {
+ try {
+ Object o = TcpClient.requestToServer(
+ addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(),
+ true);
+ FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
+ if (response != null && response.getCoordinator() != null) {
+ anyResponses = false;
+ coordinators.add(response.getCoordinator());
+ if (response.isFromView()) {
+ GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
+ services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
+ if (response.isUsePreferredCoordinators()
+ && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
+ mbr.setPreferredForCoordinator(false);
+ }
+ }
+ }
+ } catch (IOException | ClassNotFoundException problem) {
+ }
+ }
+ if (coordinators.isEmpty()) {
+ return null;
+ }
+ if (!anyResponses) {
+ try { Thread.sleep(2000); } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+ } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
+
+ Iterator<InternalDistributedMember> it = coordinators.iterator();
+ if (coordinators.size() == 1) {
+ return it.next();
+ }
+ InternalDistributedMember oldest = it.next();
+ while (it.hasNext()) {
+ InternalDistributedMember candidate = it.next();
+ if (oldest.compareTo(candidate) > 0) {
+ oldest = candidate;
+ }
+ }
+ return oldest;
+ }
+
+
+ /**
+ * receives a JoinResponse holding a membership view or rejection message
+ * @param rsp
+ */
+ private void processJoinResponse(JoinResponseMessage rsp) {
+ synchronized(joinResponse) {
+ joinResponse[0] = rsp;
+ joinResponse.notify();
+ }
+ }
+
+ @Override
+ public NetView getView() {
+ return currentView;
+ }
+
+ @Override
+ public InternalDistributedMember getMemberID() {
+ return this.localAddress;
+ }
+
+ public void installView(NetView newView) {
+ synchronized(viewInstallationLock) {
+ if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
+ // old view - ignore it
+ return;
+ }
+
+ if (checkForPartition(newView)) {
+ if (quorumRequired) {
+ List<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
+ services.getManager().forceDisconnect(
+ LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
+ }
+ return;
+ }
+
+ currentView = newView;
+ preparedView = null;
+ lastConflictingView = null;
+ services.installView(newView);
+
+ if (!newView.getCreator().equals(this.localAddress)) {
+ if (newView.shouldBeCoordinator(this.localAddress)) {
+ becomeCoordinator();
+ } else if (this.isCoordinator) {
+ // stop being coordinator
+ stateLock.writeLock().lock();
+ try {
+ stopCoordinatorServices();
+ this.isCoordinator = false;
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+ }
+ if (!this.isCoordinator) {
+ // get rid of outdated requests. It's possible some requests are
+ // newer than the view just processed - the senders will have to
+ // resend these
+ synchronized(viewRequests) {
+ for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
+ DistributionMessage m = it.next();
+ if (m instanceof JoinRequestMessage) {
+ it.remove();
+ } else if (m instanceof LeaveRequestMessage) {
+ if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) {
+ it.remove();
+ }
+ } else if (m instanceof RemoveMemberMessage) {
+ if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) {
+ it.remove();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * check to see if the new view shows a drop of 51% or more
+ */
+ private boolean checkForPartition(NetView newView) {
+ if (currentView == null) {
+ return false;
+ }
+ int oldWeight = currentView.memberWeight();
+ int failedWeight = newView.getCrashedMemberWeight(currentView);
+ if (failedWeight > 0) {
+ if (logger.isInfoEnabled()) {
+ newView.logCrashedMemberWeights(currentView, logger);
+ }
+ int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
+ if (failedWeight > failurePoint) {
+ services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ /** invoke this under the viewInstallationLock */
+ private void startCoordinatorServices() {
+ if (viewCreator == null || viewCreator.isShutdown()) {
+ viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
+ viewCreator.setDaemon(true);
+ viewCreator.start();
+ }
+ }
+
+ private void stopCoordinatorServices() {
+ if (viewCreator != null && !viewCreator.isShutdown()) {
+ viewCreator.shutdown();
+ }
+ }
+
+ public static void loadEmergencyClasses() {
+ }
+
+ @Override
+ public void emergencyClose() {
+ isStopping = true;
+ isJoined = false;
+ stopCoordinatorServices();
+ isCoordinator = false;
+ currentView = null;
+ }
+
+ public void beSick() {
+ }
+
+ public void playDead() {
+ }
+
+ public void beHealthy() {
+ }
+
+
+
+ @Override
+ public void start() {
+ }
+
+
+
+ @Override
+ public void started() {
+ this.localAddress = services.getMessenger().getMemberID();
+ }
+
+
+
+ @Override
+ public void stop() {
+ logger.debug("JoinLeave stopping");
+ leave();
+ }
+
+
+
+ @Override
+ public void stopped() {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ @Override
+ public void leave() {
+ synchronized(viewInstallationLock) {
+ NetView view = currentView;
+ isStopping = true;
+ if (view != null) {
+ if (view.size() > 1) {
+ if (this.isCoordinator) {
+ logger.debug("JoinLeave stopping coordination services");
+ stopCoordinatorServices();
+ NetView newView = new NetView(view, view.getViewId()+1);
+ newView.remove(localAddress);
+ InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials(this.localAddress));
+ m.setRecipients(newView.getMembers());
+ services.getMessenger().send(m);
+ try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
+ catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+ }
+ else {
+ logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator());
+ LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress);
+ services.getMessenger().send(m);
+ }
+ } // view.size
+ }// view != null
+ }
+
+ }
+
+
+
+ @Override
+ public void remove(InternalDistributedMember m, String reason) {
+ NetView v = this.currentView;
+ if (v != null) {
+ RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
+ reason);
+ services.getMessenger().send(msg);
+ }
+ }
+
+ @Override
+ public void disableDisconnectOnQuorumLossForTesting() {
+ this.quorumRequired = false;
+ }
+
+ @Override
+ public void init(Services s) {
+ this.services = s;
+ services.getMessenger().addHandler(JoinRequestMessage.class, this);
+ services.getMessenger().addHandler(JoinResponseMessage.class, this);
+ services.getMessenger().addHandler(InstallViewMessage.class, this);
+ services.getMessenger().addHandler(ViewAckMessage.class, this);
+ services.getMessenger().addHandler(LeaveRequestMessage.class, this);
+ services.getMessenger().addHandler(RemoveMemberMessage.class, this);
+ services.getMessenger().addHandler(JoinRequestMessage.class, this);
+ services.getMessenger().addHandler(JoinResponseMessage.class, this);
+
+ DistributionConfig dc = services.getConfig().getDistributionConfig();
+ int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000;
+ if (ackCollectionTimeout < 1500) {
+ ackCollectionTimeout = 1500;
+ } else if (ackCollectionTimeout > 12437) {
+ ackCollectionTimeout = 12437;
+ }
+ ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
+ this.viewAckTimeout = ackCollectionTimeout;
+
+ this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
+
+ DistributionConfig dconfig = services.getConfig().getDistributionConfig();
+ String bindAddr = dconfig.getBindAddress();
+ locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
+ }
+
+ @Override
+ public void processMessage(DistributionMessage m) {
+ if (isStopping) {
+ return;
+ }
+ logger.debug("JoinLeave processing {}", m);
+ switch (m.getDSFID()) {
+ case JOIN_REQUEST:
+ processJoinRequest((JoinRequestMessage)m);
+ break;
+ case JOIN_RESPONSE:
+ processJoinResponse((JoinResponseMessage)m);
+ break;
+ case INSTALL_VIEW_MESSAGE:
+ processViewMessage((InstallViewMessage)m);
+ break;
+ case VIEW_ACK_MESSAGE:
+ processViewAckMessage((ViewAckMessage)m);
+ break;
+ case LEAVE_REQUEST_MESSAGE:
+ processLeaveRequest((LeaveRequestMessage)m);
+ break;
+ case REMOVE_MEMBER_MESSAGE:
+ processRemoveRequest((RemoveMemberMessage)m);
+ break;
+ default:
+ throw new IllegalArgumentException("unknown message type: " + m);
+ }
+ }
+
+
+
+
+ class ViewReplyProcessor {
+ volatile int viewId = -1;
+ volatile Set<InternalDistributedMember> recipients;
+ volatile NetView conflictingView;
+ volatile InternalDistributedMember conflictingViewSender;
+ volatile boolean waiting;
+ final boolean isPrepareViewProcessor;
+
+ ViewReplyProcessor(boolean forPreparation) {
+ this.isPrepareViewProcessor = forPreparation;
+ }
+
+ void initialize(int viewId, Set<InternalDistributedMember> recips) {
+ this.waiting = true;
+ this.viewId = viewId;
+ this.recipients = recips;
+ this.conflictingView = null;
+ }
+
+ void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
+ if (!this.waiting) {
+ return;
+ }
+
+ if (viewId == this.viewId) {
+ if (conflictingView != null) {
+ this.conflictingViewSender = sender;
+ this.conflictingView = conflictingView;
+ }
+
+ Set<InternalDistributedMember> waitingFor = this.recipients;
+ synchronized(waitingFor) {
+ waitingFor.remove(sender);
+ if (waitingFor.isEmpty()) {
+ logger.debug("All view responses received - notifying waiting thread");
+ waitingFor.notify();
+ }
+ }
+
+ }
+ }
+
+ Set<InternalDistributedMember> waitForResponses() {
+ Set<InternalDistributedMember> result = this.recipients;
+ long endOfWait = System.currentTimeMillis() + viewAckTimeout;
+ try {
+ while (System.currentTimeMillis() < endOfWait
+ && (services.getCancelCriterion().cancelInProgress() == null)) {
+ try {
+ synchronized(result) {
+ result.wait(1000);
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while waiting for view resonses");
+ Thread.currentThread().interrupt();
+ return result;
+ }
+ }
+ } finally {
+ this.waiting = false;
+ }
+ return result;
+ }
+
+ NetView getConflictingView() {
+ return this.conflictingView;
+ }
+
+ InternalDistributedMember getConflictingViewSender() {
+ return this.conflictingViewSender;
+ }
+
+ Set<InternalDistributedMember> getUnresponsiveMembers() {
+ return this.recipients;
+ }
+ }
+
+
+
+
+
+ class ViewCreator extends Thread {
+ boolean shutdown = false;
+
+ ViewCreator(String name, ThreadGroup tg) {
+ super(tg, name);
+ }
+
+ void shutdown() {
+ shutdown = true;
+ synchronized(viewRequests) {
+ viewRequests.notify();
+ interrupt();
+ }
+ }
+
+ boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public void run() {
+ List<DistributionMessage> requests = null;
+ logger.info("View Creator thread is starting");
+ long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+ try {
+ for (;;) {
+ synchronized(viewRequests) {
+ if (shutdown) {
+ return;
+ }
+ if (viewRequests.isEmpty()) {
+ try {
+ logger.debug("View Creator is waiting for requests");
+ viewRequests.wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ } else {
+ if (System.currentTimeMillis() < okayToCreateView) {
+ // sleep to let more requests arrive
+ try {
+ sleep(100);
+ continue;
+ } catch (InterruptedException e) {
+ return;
+ }
+ } else {
+ // time to create a new membership view
+ if (requests == null) {
+ requests = new ArrayList<DistributionMessage>(viewRequests);
+ } else {
+ requests.addAll(viewRequests);
+ }
+ viewRequests.clear();
+ okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+ }
+ }
+ } // synchronized
+ if (requests != null && !requests.isEmpty()) {
+ logger.debug("View Creator is processing {} requests for the next membership view", requests.size());
+ /*boolean success = */createAndSendView(requests);
+ requests = null;
+ }
+ }
+ } finally {
+ shutdown = true;
+ }
+ }
+
+ /**
+ * Create a new membership view and send it to members (including crashed members).
+ * Returns false if the view cannot be prepared successfully, true otherwise
+ */
+ boolean createAndSendView(List<DistributionMessage> requests) {
+ List<InternalDistributedMember> joinReqs = new ArrayList<InternalDistributedMember>();
+ List<InternalDistributedMember> leaveReqs = new ArrayList<InternalDistributedMember>();
+ List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>();
+ List<String> removalReasons = new ArrayList<String>();
+
+ for (DistributionMessage msg: requests) {
+ logger.debug("processing request {}", msg);
+ if (msg instanceof JoinRequestMessage) {
+ InternalDistributedMember mbr = ((JoinRequestMessage)msg).getMemberID();
+ joinReqs.add(mbr);
+ }
+ else if (msg instanceof LeaveRequestMessage) {
+ leaveReqs.add(((LeaveRequestMessage) msg).getMemberID());
+ }
+ else if (msg instanceof RemoveMemberMessage) {
+ removalReqs.add(((RemoveMemberMessage) msg).getMemberID());
+ removalReasons.add(((RemoveMemberMessage) msg).getReason());
+ }
+ else {
+ // TODO: handle removals
+ logger.warn("Unknown membership request encountered: {}", msg);
+ }
+ }
+
+ NetView newView;
+ synchronized(viewInstallationLock) {
+ int viewNumber = 0;
+ List<InternalDistributedMember> mbrs;
+ if (currentView == null) {
+ mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
+ } else {
+ viewNumber = currentView.getViewId()+1;
+ mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
+ }
+ mbrs.addAll(joinReqs);
+ mbrs.removeAll(leaveReqs);
+ mbrs.removeAll(removalReqs);
+ newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
+ removalReqs);
+ }
+
+ for (InternalDistributedMember mbr: joinReqs) {
+ mbr.setVmViewId(newView.getViewId());
+ }
+ // send removal messages before installing the view so we stop
+ // getting messages from members that have been kicked out
+ sendRemoveMessages(removalReqs, removalReasons, newView);
+
+ // we want to always check for quorum loss but don't act on it
+ // unless network-partition-detection is enabled
+ if ( !(checkForPartition(newView) && quorumRequired) ) {
+ sendJoinResponses(joinReqs, newView);
+ }
+
+ if (quorumRequired) {
+ boolean prepared = false;
+ do {
+ if (this.shutdown || Thread.currentThread().isInterrupted()) {
+ return false;
+ }
+ prepared = prepareView(newView, joinReqs);
+ if (!prepared && quorumRequired) {
+ Set<InternalDistributedMember> unresponsive = prepareProcessor.getUnresponsiveMembers();
+ try {
+ removeHealthyMembers(unresponsive);
+ } catch (InterruptedException e) {
+ // abort the view if interrupted
+ shutdown = true;
+ return false;
+ }
+
+ if (!unresponsive.isEmpty()) {
+ List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
+ failures.addAll(unresponsive);
+
+ NetView conflictingView = prepareProcessor.getConflictingView();
+ if (conflictingView != null
+ && !conflictingView.getCreator().equals(localAddress)
+ && conflictingView.getViewId() > newView.getViewId()
+ && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
+ lastConflictingView = conflictingView;
+ failures.addAll(conflictingView.getCrashedMembers());
+ }
+
+ failures.removeAll(removalReqs);
+ if (failures.size() > 0) {
+ // abort the current view and try again
+ removalReqs.addAll(failures);
+ newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
+ removalReqs);
+ }
+ }
+ }
+ } while (!prepared);
+ } // quorumRequired
+
+ lastConflictingView = null;
+
+ sendView(newView, joinReqs);
+ return true;
+ }
+
+ /**
+ * performs health checks on the collection of members, removing any that
+ * are found to be healthy
+ * @param mbrs
+ */
+ private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
+ List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+
+ for (InternalDistributedMember mbr: mbrs) {
+ final InternalDistributedMember fmbr = mbr;
+ checkers.add(new Callable<InternalDistributedMember>() {
+ @Override
+ public InternalDistributedMember call() throws Exception {
+ // return the member id if it fails health checks
+ logger.info("checking state of member " + fmbr);
+ if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
+ logger.info("member " + fmbr + " passed availability check");
+ return fmbr;
+ }
+ logger.info("member " + fmbr + " failed availability check");
+ return null;
+ }
+ });
+ }
+
+ ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
+ AtomicInteger i = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(Services.getThreadGroup(), r,
+ "Member verification thread " + i.incrementAndGet());
+ }
+ });
+
+ try {
+ List<Future<InternalDistributedMember>> futures;
+ futures = svc.invokeAll(checkers);
+
+ for (Future<InternalDistributedMember> future: futures) {
+ try {
+ InternalDistributedMember mbr = future.get(viewAckTimeout, TimeUnit.MILLISECONDS);
+ if (mbr != null) {
+ logger.debug("disregarding lack of acknowledgement from {}", mbr);
+ mbrs.remove(mbr);
+ }
+ } catch (java.util.concurrent.TimeoutException e) {
+ // TODO should the member be removed if we can't verify it in time?
+ } catch (ExecutionException e) {
+ logger.info("unexpected exception caught during member verification", e);
+ }
+ }
+ } finally {
+ svc.shutdownNow();
+ }
+ }
+ }
+
+}
[3/3] incubator-geode git commit: GEODE-77: Implement Authenticator
interface in class GMSAuthenticator with unit tests.
Posted by qi...@apache.org.
GEODE-77: Implement Authenticator interface in class GMSAuthenticator with unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0a70d514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0a70d514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0a70d514
Branch: refs/heads/feature/GEODE-77
Commit: 0a70d5140277b366be4d0b9607a5e68936837c2f
Parents: 52f8ce6
Author: Qihong Chen <qc...@pivotal.io>
Authored: Thu Aug 13 09:55:33 2015 -0700
Committer: Qihong Chen <qc...@pivotal.io>
Committed: Thu Aug 13 09:55:33 2015 -0700
----------------------------------------------------------------------
.../internal/InternalDistributedSystem.java | 1 +
.../internal/membership/gms/Services.java | 22 +-
.../membership/gms/auth/GMSAuthenticator.java | 155 +-
.../gms/interfaces/Authenticator.java | 2 +-
.../membership/gms/membership/GMSJoinLeave.java | 2356 +++++++++---------
.../gemfire/internal/i18n/LocalizedStrings.java | 9 +-
.../gms/auth/GMSAuthenticatorJUnitTest.java | 300 +++
7 files changed, 1651 insertions(+), 1194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index d03f558..8a4e20a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -564,6 +564,7 @@ public final class InternalDistributedSystem
this.securityLogWriter.fine("SecurityLogWriter is created.");
}
+ Services.setLogWriter(this.logWriter);
Services.setSecurityLogWriter(this.securityLogWriter);
this.clock = new DSClock(this.isLoner);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 79830b9..8a27cf0 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -36,6 +36,7 @@ public class Services {
private static final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("Membership", logger);
+ private static InternalLogWriter staticLogWriter;
private static InternalLogWriter staticSecurityLogWriter;
final private Manager manager;
@@ -48,6 +49,7 @@ public class Services {
final private DMStats stats;
final private Stopper cancelCriterion;
+ private InternalLogWriter logWriter;
private InternalLogWriter securityLogWriter;
private Timer timer = new Timer("Membership Timer", true);
@@ -95,6 +97,8 @@ public class Services {
// TODO fix this so that IDS doesn't know about Services
securityLogWriter = staticSecurityLogWriter;
staticSecurityLogWriter = null;
+ logWriter = staticLogWriter;
+ staticLogWriter = null;
this.auth.init(this);
this.messenger.init(this);
this.manager.init(this);
@@ -154,12 +158,20 @@ public class Services {
this.manager.stop();
this.timer.cancel();
}
-
- public static void setSecurityLogWriter(InternalLogWriter writer) {
- staticSecurityLogWriter = writer;
+
+ public static void setLogWriter(InternalLogWriter writer) {
+ staticLogWriter = writer;
}
-
- public LogWriter getSecurityLogWriter() {
+
+ public static void setSecurityLogWriter(InternalLogWriter securityWriter) {
+ staticSecurityLogWriter = securityWriter;
+ }
+
+ public InternalLogWriter getLogWriter() {
+ return this.logWriter;
+ }
+
+ public InternalLogWriter getSecurityLogWriter() {
return this.securityLogWriter;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
index c008171..7e7072d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
@@ -1,17 +1,42 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.auth;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
+import com.gemstone.gemfire.internal.ClassLoadUtil;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.security.AuthInitialize;
import com.gemstone.gemfire.security.AuthenticationFailedException;
+import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+import java.lang.reflect.Method;
+import java.security.Principal;
+import java.util.Properties;
+import java.util.Set;
+
+// static messages
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.HandShake_AUTHENTICATOR_INSTANCE_COULD_NOT_BE_OBTAINED;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.HandShake_FAILED_TO_ACQUIRE_AUTHENTICATOR_OBJECT;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.HandShake_FAILED_TO_ACQUIRE_AUTHINITIALIZE_METHOD_0;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.AUTH_PEER_AUTHENTICATION_FAILED_WITH_EXCEPTION;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.AUTH_PEER_AUTHENTICATION_FAILED;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.AUTH_PEER_AUTHENTICATION_MISSING_CREDENTIALS;
+import static com.gemstone.gemfire.internal.i18n.LocalizedStrings.AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE;
+import static com.gemstone.gemfire.distributed.internal.DistributionConfig.SECURITY_PEER_AUTH_INIT_NAME;
+import static com.gemstone.gemfire.distributed.internal.DistributionConfig.SECURITY_PEER_AUTHENTICATOR_NAME;
+
public class GMSAuthenticator implements Authenticator {
+ private Services services;
+
@Override
public void init(Services s) {
- // TODO Auto-generated method stub
-
+ this.services = s;
}
@Override
@@ -62,17 +87,129 @@ public class GMSAuthenticator implements Authenticator {
}
+ /**
+ * Authenticate peer member with authenticator class defined by property
+ * "security-peer-authenticator".
+ * @param member the member to be authenticated
+ * @param credentials the credentials used in authentication
+ * @return null if authentication succeed (including no authenticator case),
+ * otherwise, return failure message
+ * @throws AuthenticationFailedException
+ * this will be removed since return string is used for failure
+ */
@Override
- public String authenticate(InternalDistributedMember m, Object credentials)
- throws AuthenticationFailedException {
- // TODO Auto-generated method stub
- return null;
+ public String authenticate(InternalDistributedMember member, Object credentials)
+ throws AuthenticationFailedException {
+ return authenticate(member, credentials, securityProps, services.getJoinLeave().getMemberID());
+ }
+
+ // for unit test
+ /* package */ String authenticate(
+ DistributedMember member, Object credentials, Properties secProps, DistributedMember localMember)
+ throws AuthenticationFailedException {
+
+ String authMethod = secProps.getProperty(SECURITY_PEER_AUTHENTICATOR_NAME);
+ if (authMethod == null || authMethod.length() == 0) {
+ return null;
+ }
+
+ InternalLogWriter securityLogWriter = services.getSecurityLogWriter();
+ String failMsg = null;
+ if (credentials != null) {
+ try {
+ invokeAuthenticator(authMethod, member, credentials);
+ } catch (Exception ex) {
+ securityLogWriter.warning(
+ AUTH_PEER_AUTHENTICATION_FAILED_WITH_EXCEPTION,
+ new Object[] {member, authMethod, ex.getLocalizedMessage()}, ex);
+ failMsg = AUTH_PEER_AUTHENTICATION_FAILED.toLocalizedString(localMember);
+ }
+ } else { // No credentials - need to send failure message
+ securityLogWriter.warning(
+ AUTH_PEER_AUTHENTICATION_MISSING_CREDENTIALS, new Object[] {member, authMethod});
+ failMsg = AUTH_PEER_AUTHENTICATION_MISSING_CREDENTIALS.toLocalizedString(member, authMethod);
+ }
+ return failMsg;
}
+ /* package */ Principal invokeAuthenticator(String authMethod, DistributedMember member, Object credentials)
+ throws AuthenticationFailedException {
+ com.gemstone.gemfire.security.Authenticator auth = null;
+ try {
+ Method getter = ClassLoadUtil.methodFromName(authMethod);
+ auth = (com.gemstone.gemfire.security.Authenticator) getter.invoke(null, (Object[]) null);
+ if (auth == null)
+ throw new AuthenticationFailedException(
+ HandShake_AUTHENTICATOR_INSTANCE_COULD_NOT_BE_OBTAINED.toLocalizedString());
+
+ LogWriter logWriter = services.getLogWriter();
+ LogWriter securityLogWriter = services.getSecurityLogWriter();
+ auth.init(securityProps, logWriter, securityLogWriter);
+ return auth.authenticate((Properties) credentials, member);
+ } catch (GemFireSecurityException gse) {
+ throw gse;
+ } catch (Exception ex) {
+ throw new AuthenticationFailedException(
+ HandShake_FAILED_TO_ACQUIRE_AUTHENTICATOR_OBJECT.toLocalizedString(), ex);
+ } finally {
+ if (auth != null) auth.close();
+ }
+ }
+
+ /**
+ * Get credential object for the given GemFire distributed member
+ * @param member the target distributed member
+ * @return the credential object
+ */
@Override
- public Object getCredentials() {
- // TODO Auto-generated method stub
- return null;
+ public Object getCredentials(InternalDistributedMember member) {
+ return getCredentials(member, securityProps);
+ }
+
+ // for unit test
+ /* package */ Properties getCredentials(DistributedMember member, Properties secProps) {
+ Properties credentials = null;
+ String authMethod = secProps.getProperty(SECURITY_PEER_AUTH_INIT_NAME);
+ try {
+ if (authMethod != null && authMethod.length() > 0) {
+ Method getter = ClassLoadUtil.methodFromName(authMethod);
+ AuthInitialize auth = (AuthInitialize)getter.invoke(null, (Object[]) null);
+ if (auth == null)
+ throw new AuthenticationRequiredException(
+ AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE.toLocalizedString(authMethod));
+
+ try {
+ LogWriter logWriter = services.getLogWriter();
+ LogWriter securityLogWriter = services.getSecurityLogWriter();
+ auth.init(logWriter, securityLogWriter);
+ credentials = auth.getCredentials(secProps, member, true);
+ } finally {
+ auth.close();
+ }
+ }
+ } catch (GemFireSecurityException gse) {
+ throw gse;
+ } catch (Exception ex) {
+ throw new AuthenticationRequiredException(
+ HandShake_FAILED_TO_ACQUIRE_AUTHINITIALIZE_METHOD_0.toLocalizedString(authMethod), ex);
+ }
+ return credentials;
+ }
+
+ private final static String secPrefix = "gemfire.sys.security-";
+ private final static int gemfireSysPrefixLen = "gemfire.sys.".length();
+ private Properties securityProps = getSecurityProps();
+
+ Properties getSecurityProps() {
+ Properties props = new Properties();
+ Set keys = System.getProperties().keySet();
+ for (Object key: keys) {
+ String propKey = (String) key;
+ if (propKey.startsWith(secPrefix)) {
+ props.setProperty(propKey.substring(gemfireSysPrefixLen), System.getProperty(propKey));
+ }
+ }
+ return props;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Authenticator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Authenticator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Authenticator.java
index fa45ff3..596b265 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Authenticator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Authenticator.java
@@ -7,5 +7,5 @@ public interface Authenticator extends Service {
String authenticate(InternalDistributedMember m, Object credentials) throws AuthenticationFailedException;
- Object getCredentials();
+ Object getCredentials(InternalDistributedMember m);
}