You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2016/12/06 00:17:02 UTC
[4/4] zookeeper git commit: ZOOKEEPER-1045: Support Quorum Peer
mutual authentication via SASL (rakeshr via phunt)
ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL (rakeshr via phunt)
Change-Id: I7ae6bd863d46621bba5b9abc908e1497111e0336
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/8a06bd1c
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/8a06bd1c
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/8a06bd1c
Branch: refs/heads/branch-3.4
Commit: 8a06bd1ccef382461c7b0a63f2012f4aeac90753
Parents: 967c3a7
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Mon Dec 5 16:15:37 2016 -0800
Committer: Patrick Hunt <ph...@apache.org>
Committed: Mon Dec 5 16:15:37 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 6 +
build.xml | 7 +-
ivy.xml | 28 +
src/java/main/org/apache/zookeeper/Login.java | 7 +-
.../zookeeper/SaslClientCallbackHandler.java | 104 +++
.../zookeeper/client/ZooKeeperSaslClient.java | 161 +---
.../zookeeper/server/ZooKeeperSaslServer.java | 114 +--
.../server/auth/SaslServerCallbackHandler.java | 10 +-
.../server/quorum/FastLeaderElection.java | 2 +
.../zookeeper/server/quorum/Follower.java | 5 +-
.../apache/zookeeper/server/quorum/Leader.java | 12 +-
.../apache/zookeeper/server/quorum/Learner.java | 24 +-
.../zookeeper/server/quorum/LearnerHandler.java | 27 +-
.../zookeeper/server/quorum/Observer.java | 8 +-
.../server/quorum/QuorumCnxManager.java | 347 +++++--
.../zookeeper/server/quorum/QuorumPeer.java | 181 +++-
.../server/quorum/QuorumPeerConfig.java | 48 +-
.../zookeeper/server/quorum/QuorumPeerMain.java | 39 +-
.../quorum/auth/NullQuorumAuthLearner.java | 33 +
.../quorum/auth/NullQuorumAuthServer.java | 34 +
.../server/quorum/auth/QuorumAuth.java | 96 ++
.../server/quorum/auth/QuorumAuthLearner.java | 40 +
.../server/quorum/auth/QuorumAuthServer.java | 41 +
.../quorum/auth/SaslQuorumAuthLearner.java | 230 +++++
.../quorum/auth/SaslQuorumAuthServer.java | 180 ++++
.../auth/SaslQuorumServerCallbackHandler.java | 148 +++
.../apache/zookeeper/util/SecurityUtils.java | 298 ++++++
src/java/test/data/kerberos/minikdc-krb5.conf | 30 +
src/java/test/data/kerberos/minikdc.ldiff | 52 ++
.../zookeeper/server/quorum/CnxManagerTest.java | 15 +-
.../quorum/FLEBackwardElectionRoundTest.java | 4 +-
.../server/quorum/FLECompatibilityTest.java | 4 +-
.../server/quorum/FLEDontCareTest.java | 10 +-
.../server/quorum/FLELostMessageTest.java | 2 +-
.../zookeeper/server/quorum/LearnerTest.java | 4 +-
.../server/quorum/QuorumCnxManagerTest.java | 925 +++++++++++++++++++
.../server/quorum/QuorumPeerTestBase.java | 65 +-
.../zookeeper/server/quorum/Zab1_0Test.java | 44 +-
.../quorum/auth/KerberosSecurityTestcase.java | 120 +++
.../server/quorum/auth/KerberosTestUtils.java | 76 ++
.../zookeeper/server/quorum/auth/MiniKdc.java | 418 +++++++++
.../server/quorum/auth/MiniKdcTest.java | 184 ++++
.../server/quorum/auth/QuorumAuthTestBase.java | 146 +++
.../quorum/auth/QuorumAuthUpgradeTest.java | 239 +++++
.../quorum/auth/QuorumDigestAuthTest.java | 221 +++++
.../quorum/auth/QuorumKerberosAuthTest.java | 110 +++
.../auth/QuorumKerberosHostBasedAuthTest.java | 184 ++++
.../apache/zookeeper/test/FLEPredicateTest.java | 2 +-
src/zookeeper.jute | 5 +
49 files changed, 4655 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b69758..eb9ef86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,12 @@ IMPROVEMENTS:
ZOOKEEPER-2606: SaslServerCallbackHandler#handleAuthorizeCallback() should
log the exception (Ted Yu via fpj)
+NEW FEATURE:
+
+ ZOOKEEPER-1045: Support Quorum Peer mutual authentication via SASL
+ (rakeshr via phunt)
+
+
Release 3.4.9 - 2016-08-23
Backward compatible changes:
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 505597a..5c4fab2 100644
--- a/build.xml
+++ b/build.xml
@@ -75,6 +75,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
<property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
<property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
<property name="test.data.buffersize.dir" value="${test.data.dir}/buffersize" />
+ <property name="test.data.kerberos.dir" value="${test.data.dir}/kerberos" />
<property name="test.cppunit.dir" value="${test.java.build.dir}/test-cppunit"/>
<property name="test.tmp.dir" value="${test.java.build.dir}/tmp" />
<property name="test.output" value="no" />
@@ -1242,6 +1243,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
<delete dir="${test.data.upgrade.dir}" />
<delete dir="${test.data.invalid.dir}" />
<delete dir="${test.data.buffersize.dir}" />
+ <delete dir="${test.data.kerberos.dir}" />
<delete dir="${test.data.dir}" />
<mkdir dir="${test.log.dir}" />
<mkdir dir="${test.tmp.dir}" />
@@ -1258,7 +1260,10 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
<copy todir="${test.data.buffersize.dir}">
<fileset dir="${basedir}/src/java/test/data/buffersize"/>
</copy>
-
+ <mkdir dir="${test.data.kerberos.dir}" />
+ <copy todir="${test.data.kerberos.dir}">
+ <fileset dir="${basedir}/src/java/test/data/kerberos"/>
+ </copy>
</target>
<condition property="quicktest">
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 95b0e5a..437b86b 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -74,6 +74,34 @@
rev="2.4" conf="releaseaudit->default"/>
<dependency org="commons-collections" name="commons-collections"
rev="3.2.2" conf="releaseaudit->default"/>
+
+ <dependency org="commons-io" name="commons-io" rev="2.4"
+ conf="test->default"/>
+
+ <dependency org="org.apache.kerby" name="kerb-simplekdc" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerby-config" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-core" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-server" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-common" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-admin" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-identity" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-client" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-util" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerb-crypto" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerby-util" rev="1.0.0-RC2"
+ conf="test->default"/>
+ <dependency org="org.apache.kerby" name="kerby-asn1" rev="1.0.0-RC2"
+ conf="test->default"/>
</dependencies>
</ivy-module>
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/Login.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/Login.java b/src/java/main/org/apache/zookeeper/Login.java
index aaa220c..3e21aae 100644
--- a/src/java/main/org/apache/zookeeper/Login.java
+++ b/src/java/main/org/apache/zookeeper/Login.java
@@ -32,8 +32,9 @@ import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.auth.callback.CallbackHandler;
-import org.apache.log4j.Logger;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.Subject;
import java.util.Date;
@@ -41,7 +42,7 @@ import java.util.Random;
import java.util.Set;
public class Login {
- Logger LOG = Logger.getLogger(Login.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Login.class);
public CallbackHandler callbackHandler;
// LoginThread will sleep until 80% of time from last refresh to
@@ -291,7 +292,7 @@ public class Login {
}
LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
loginContext.login();
- LOG.info("successfully logged in.");
+ LOG.info("{} successfully logged in.", loginContextName);
return loginContext;
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
new file mode 100644
index 0000000..d6f5549
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/SaslClientCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is used by the SASL mechanisms to get further information to complete
+ * the authentication. For example, a SASL mechanism might use this callback
+ * handler to do verification operation. The CallbackHandler interface here
+ * refers to javax.security.auth.callback.CallbackHandler. It should not be
+ * confused with ZooKeeper packet callbacks like
+ * org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
+ */
+public class SaslClientCallbackHandler implements CallbackHandler {
+ private String password = null;
+ private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+ private final String entity;
+ public SaslClientCallbackHandler(String password, String client) {
+ this.password = password;
+ this.entity = client;
+ }
+
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nc = (NameCallback) callback;
+ nc.setName(nc.getDefaultName());
+ }
+ else {
+ if (callback instanceof PasswordCallback) {
+ PasswordCallback pc = (PasswordCallback)callback;
+ if (password != null) {
+ pc.setPassword(this.password.toCharArray());
+ } else {
+ LOG.warn("Could not login: the {} is being asked for a password, but the ZooKeeper {}" +
+ " code does not currently support obtaining a password from the user." +
+ " Make sure that the {} is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)' and restart the {}. If" +
+ " you still get this message after that, the TGT in the ticket cache has expired and must" +
+ " be manually refreshed. To do so, first determine if you are using a password or a" +
+ " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+ " is running this Zookeeper {} using the command" +
+ " 'kinit <princ>' (where <princ> is the name of the {}'s Kerberos principal)." +
+ " If the latter, do" +
+ " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+ " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+ " restart this {}. If you continue to see this message after manually refreshing" +
+ " your cache, ensure that your KDC host's clock is in sync with this host's clock.",
+ new Object[]{entity, entity, entity, entity, entity, entity, entity});
+ }
+ }
+ else {
+ if (callback instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) callback;
+ rc.setText(rc.getDefaultText());
+ }
+ else {
+ if (callback instanceof AuthorizeCallback) {
+ AuthorizeCallback ac = (AuthorizeCallback) callback;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL " + entity + "Callback");
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
index 21ef0fa..f3c9d3c 100644
--- a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
+++ b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
@@ -19,22 +19,13 @@
package org.apache.zookeeper.client;
import java.io.IOException;
-import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -42,17 +33,13 @@ import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.Login;
+import org.apache.zookeeper.SaslClientCallbackHandler;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.GetSASLRequest;
import org.apache.zookeeper.proto.SetSASLResponse;
-import org.apache.zookeeper.server.auth.KerberosName;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.Oid;
+import org.apache.zookeeper.util.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,83 +213,14 @@ public class ZooKeeperSaslClient {
}
// note that the login object is static: it's shared amongst all zookeeper-related connections.
// in order to ensure the login is initialized only once, it must be synchronized the code snippet.
- login = new Login(loginContext, new ClientCallbackHandler(null));
+ login = new Login(loginContext, new SaslClientCallbackHandler(null, "Client"));
login.startThreadIfNeeded();
initializedLogin = true;
}
}
}
- Subject subject = login.getSubject();
- SaslClient saslClient;
- // Use subject.getPrincipals().isEmpty() as an indication of which SASL mechanism to use:
- // if empty, use DIGEST-MD5; otherwise, use GSSAPI.
- if (subject.getPrincipals().isEmpty()) {
- // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism instead.
- LOG.info("Client will use DIGEST-MD5 as SASL mechanism.");
- String[] mechs = {"DIGEST-MD5"};
- String username = (String)(subject.getPublicCredentials().toArray()[0]);
- String password = (String)(subject.getPrivateCredentials().toArray()[0]);
- // "zk-sasl-md5" is a hard-wired 'domain' parameter shared with zookeeper server code (see ServerCnxnFactory.java)
- saslClient = Sasl.createSaslClient(mechs, username, "zookeeper", "zk-sasl-md5", null, new ClientCallbackHandler(password));
- return saslClient;
- }
- else { // GSSAPI.
- boolean usingNativeJgss =
- Boolean.getBoolean("sun.security.jgss.native");
- if (usingNativeJgss) {
- // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
- // """
- // In addition, when performing operations as a particular
- // Subject, e.g. Subject.doAs(...) or Subject.doAsPrivileged(...),
- // the to-be-used GSSCredential should be added to Subject's
- // private credential set. Otherwise, the GSS operations will
- // fail since no credential is found.
- // """
- try {
- GSSManager manager = GSSManager.getInstance();
- Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
- GSSCredential cred = manager.createCredential(null,
- GSSContext.DEFAULT_LIFETIME,
- krb5Mechanism,
- GSSCredential.INITIATE_ONLY);
- subject.getPrivateCredentials().add(cred);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added private credential to subject: " + cred);
- }
- } catch (GSSException ex) {
- LOG.warn("Cannot add private credential to subject; " +
- "authentication at the server may fail", ex);
- }
- }
- final Object[] principals = subject.getPrincipals().toArray();
- // determine client principal from subject.
- final Principal clientPrincipal = (Principal)principals[0];
- final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
- // assume that server and client are in the same realm (by default; unless the system property
- // "zookeeper.server.realm" is set).
- String serverRealm = System.getProperty("zookeeper.server.realm",clientKerberosName.getRealm());
- KerberosName serviceKerberosName = new KerberosName(servicePrincipal+"@"+serverRealm);
- final String serviceName = serviceKerberosName.getServiceName();
- final String serviceHostname = serviceKerberosName.getHostName();
- final String clientPrincipalName = clientKerberosName.toString();
- try {
- saslClient = Subject.doAs(subject,new PrivilegedExceptionAction<SaslClient>() {
- public SaslClient run() throws SaslException {
- LOG.info("Client will use GSSAPI as SASL mechanism.");
- String[] mechs = {"GSSAPI"};
- LOG.debug("creating sasl client: client="+clientPrincipalName+";service="+serviceName+";serviceHostname="+serviceHostname);
- SaslClient saslClient = Sasl.createSaslClient(mechs,clientPrincipalName,serviceName,serviceHostname,null,new ClientCallbackHandler(null));
- return saslClient;
- }
- });
- return saslClient;
- }
- catch (Exception e) {
- LOG.error("Exception while trying to create SASL client", e);
- e.printStackTrace();
- return null;
- }
- }
+ return SecurityUtils.createSaslClient(login.getSubject(),
+ servicePrincipal, "zookeeper", "zk-sasl-md5", LOG, "Client");
} catch (LoginException e) {
// We throw LoginExceptions...
throw e;
@@ -471,75 +389,6 @@ public class ZooKeeperSaslClient {
}
}
- // The CallbackHandler interface here refers to
- // javax.security.auth.callback.CallbackHandler.
- // It should not be confused with Zookeeper packet callbacks like
- // org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
- public static class ClientCallbackHandler implements CallbackHandler {
- private String password = null;
-
- public ClientCallbackHandler(String password) {
- this.password = password;
- }
-
- public void handle(Callback[] callbacks) throws
- UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- NameCallback nc = (NameCallback) callback;
- nc.setName(nc.getDefaultName());
- }
- else {
- if (callback instanceof PasswordCallback) {
- PasswordCallback pc = (PasswordCallback)callback;
- if (password != null) {
- pc.setPassword(this.password.toCharArray());
- } else {
- LOG.warn("Could not login: the client is being asked for a password, but the Zookeeper" +
- " client code does not currently support obtaining a password from the user." +
- " Make sure that the client is configured to use a ticket cache (using" +
- " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
- " you still get this message after that, the TGT in the ticket cache has expired and must" +
- " be manually refreshed. To do so, first determine if you are using a password or a" +
- " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
- " is running this Zookeeper client using the command" +
- " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
- " If the latter, do" +
- " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
- " <keytab> is the location of the keytab file). After manually refreshing your cache," +
- " restart this client. If you continue to see this message after manually refreshing" +
- " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
- }
- }
- else {
- if (callback instanceof RealmCallback) {
- RealmCallback rc = (RealmCallback) callback;
- rc.setText(rc.getDefaultText());
- }
- else {
- if (callback instanceof AuthorizeCallback) {
- AuthorizeCallback ac = (AuthorizeCallback) callback;
- String authid = ac.getAuthenticationID();
- String authzid = ac.getAuthorizationID();
- if (authid.equals(authzid)) {
- ac.setAuthorized(true);
- } else {
- ac.setAuthorized(false);
- }
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzid);
- }
- }
- else {
- throw new UnsupportedCallbackException(callback,"Unrecognized SASL ClientCallback");
- }
- }
- }
- }
- }
- }
- }
-
public boolean clientTunneledAuthenticationInProgress() {
if (!isSASLConfigured) {
return false;
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
index 71870ce..dd6ee8f 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
@@ -18,22 +18,12 @@
package org.apache.zookeeper.server;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.zookeeper.Login;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
+import org.apache.zookeeper.util.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,107 +41,9 @@ public class ZooKeeperSaslServer {
private SaslServer createSaslServer(final Login login) {
synchronized (login) {
Subject subject = login.getSubject();
- if (subject != null) {
- // server is using a JAAS-authenticated subject: determine service principal name and hostname from zk server's subject.
- if (subject.getPrincipals().size() > 0) {
- try {
- final Object[] principals = subject.getPrincipals().toArray();
- final Principal servicePrincipal = (Principal)principals[0];
-
- // e.g. servicePrincipalNameAndHostname := "zookeeper/myhost.foo.com@FOO.COM"
- final String servicePrincipalNameAndHostname = servicePrincipal.getName();
-
- int indexOf = servicePrincipalNameAndHostname.indexOf("/");
-
- // e.g. servicePrincipalName := "zookeeper"
- final String servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
-
- // e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@FOO.COM"
- final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf+1,servicePrincipalNameAndHostname.length());
-
- indexOf = serviceHostnameAndKerbDomain.indexOf("@");
- // e.g. serviceHostname := "myhost.foo.com"
- final String serviceHostname = serviceHostnameAndKerbDomain.substring(0,indexOf);
-
- final String mech = "GSSAPI"; // TODO: should depend on zoo.cfg specified mechs, but if subject is non-null, it can be assumed to be GSSAPI.
-
- LOG.debug("serviceHostname is '"+ serviceHostname + "'");
- LOG.debug("servicePrincipalName is '"+ servicePrincipalName + "'");
- LOG.debug("SASL mechanism(mech) is '"+ mech +"'");
-
- boolean usingNativeJgss =
- Boolean.getBoolean("sun.security.jgss.native");
- if (usingNativeJgss) {
- // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
- // """
- // In addition, when performing operations as a particular
- // Subject, e.g. Subject.doAs(...) or
- // Subject.doAsPrivileged(...), the to-be-used
- // GSSCredential should be added to Subject's
- // private credential set. Otherwise, the GSS operations
- // will fail since no credential is found.
- // """
- try {
- GSSManager manager = GSSManager.getInstance();
- Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
- GSSName gssName = manager.createName(
- servicePrincipalName + "@" + serviceHostname,
- GSSName.NT_HOSTBASED_SERVICE);
- GSSCredential cred = manager.createCredential(gssName,
- GSSContext.DEFAULT_LIFETIME,
- krb5Mechanism,
- GSSCredential.ACCEPT_ONLY);
- subject.getPrivateCredentials().add(cred);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added private credential to subject: " + cred);
- }
- } catch (GSSException ex) {
- LOG.warn("Cannot add private credential to subject; " +
- "clients authentication may fail", ex);
- }
- }
- try {
- return Subject.doAs(subject,new PrivilegedExceptionAction<SaslServer>() {
- public SaslServer run() {
- try {
- SaslServer saslServer;
- saslServer = Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, login.callbackHandler);
- return saslServer;
- }
- catch (SaslException e) {
- LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: " + e);
- e.printStackTrace();
- return null;
- }
- }
- }
- );
- }
- catch (PrivilegedActionException e) {
- // TODO: exit server at this point(?)
- LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:" + e);
- e.printStackTrace();
- }
- }
- catch (IndexOutOfBoundsException e) {
- LOG.error("server principal name/hostname determination error: ", e);
- }
- }
- else {
- // JAAS non-GSSAPI authentication: assuming and supporting only DIGEST-MD5 mechanism for now.
- // TODO: use 'authMech=' value in zoo.cfg.
- try {
- SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5","zookeeper","zk-sasl-md5",null, login.callbackHandler);
- return saslServer;
- }
- catch (SaslException e) {
- LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation", e);
- }
- }
- }
+ return SecurityUtils.createSaslServer(subject, "zookeeper",
+ "zk-sasl-md5", login.callbackHandler, LOG);
}
- LOG.error("failed to create saslServer object.");
- return null;
}
public byte[] evaluateResponse(byte[] response) throws SaslException {
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
index 7fdffde..9f53a4d 100644
--- a/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
@@ -46,13 +46,15 @@ public class SaslServerCallbackHandler implements CallbackHandler {
private String userName;
private final Map<String,String> credentials = new HashMap<String,String>();
- public SaslServerCallbackHandler(Configuration configuration) throws IOException {
- String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
- ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
+ public SaslServerCallbackHandler(Configuration configuration)
+ throws IOException {
+ String serverSection = System.getProperty(
+ ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+ ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(serverSection);
if (configurationEntries == null) {
- String errorMessage = "Could not find a 'Server' entry in this configuration: Server cannot start.";
+ String errorMessage = "Could not find a '" + serverSection + "' entry in this configuration: Server cannot start.";
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index 78f3aa6..2a3d4fd 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -997,6 +997,8 @@ public class FastLeaderElection implements Election {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
+ LOG.debug("Number of connection processing threads: {}",
+ manager.getConnectionThreadCount());
}
}
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
index 043a522..9aa0d0b 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.jute.Record;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;
@@ -64,9 +65,9 @@ public class Follower extends Learner{
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
- InetSocketAddress addr = findLeader();
+ QuorumServer leaderServer = findLeader();
try {
- connectToLeader(addr);
+ connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
//check to see if the leader zxid is lower than ours
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
index c83d352..710745d 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.server.quorum;
import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
@@ -32,10 +33,12 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.sasl.SaslException;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -318,7 +321,10 @@ public class Leader {
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
- LearnerHandler fh = new LearnerHandler(s, Leader.this);
+
+ BufferedInputStream is = new BufferedInputStream(
+ s.getInputStream());
+ LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
if (stop) {
@@ -332,6 +338,8 @@ public class Leader {
} else {
throw e;
}
+ } catch (SaslException e){
+ LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
index 647b8a2..749b274 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
@@ -39,8 +39,6 @@ import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooTrace;
@@ -48,6 +46,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class is the superclass of two of the three main actors in a ZK
@@ -191,8 +191,8 @@ public class Learner {
/**
* Returns the address of the node we think is the leader.
*/
- protected InetSocketAddress findLeader() {
- InetSocketAddress addr = null;
+ protected QuorumServer findLeader() {
+ QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
@@ -200,27 +200,28 @@ public class Learner {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
- addr = s.addr;
+ leaderServer = s;
break;
}
}
- if (addr == null) {
+ if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
- return addr;
+ return leaderServer;
}
/**
* Establish a connection with the Leader found by findLeader. Retries
* 5 times before giving up.
* @param addr - the address of the Leader to connect to.
- * @throws IOException - if the socket connection fails on the 5th attempt
+ * @throws IOException <li>if the socket connection fails on the 5th attempt</li>
+ * <li>if there is an authentication failure while connecting to leader</li>
* @throws ConnectException
* @throws InterruptedException
*/
- protected void connectToLeader(InetSocketAddress addr)
- throws IOException, ConnectException, InterruptedException {
+ protected void connectToLeader(InetSocketAddress addr, String hostname)
+ throws IOException, ConnectException, InterruptedException {
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
for (int tries = 0; tries < 5; tries++) {
@@ -241,6 +242,9 @@ public class Learner {
}
Thread.sleep(1000);
}
+
+ self.authLearner.authenticate(sock, hostname);
+
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 8a748c7..51ed7e7 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -32,6 +32,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import javax.security.sasl.SaslException;
+
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
@@ -153,15 +155,30 @@ public class LearnerHandler extends ZooKeeperThread {
private BinaryOutputArchive oa;
+ private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;
- LearnerHandler(Socket sock, Leader leader) throws IOException {
+ LearnerHandler(Socket sock, BufferedInputStream bufferedInput,
+ Leader leader) throws IOException {
super("LearnerHandler-" + sock.getRemoteSocketAddress());
this.sock = sock;
this.leader = leader;
- leader.addLearnerHandler(this);
+ this.bufferedInput = bufferedInput;
+ try {
+ leader.self.authServer.authenticate(sock,
+ new DataInputStream(bufferedInput));
+ } catch (IOException e) {
+ LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection",
+ sock.getRemoteSocketAddress(), e);
+ try {
+ sock.close();
+ } catch (IOException ie) {
+ LOG.error("Exception while closing socket", ie);
+ }
+ throw new SaslException("Authentication failure: " + e.getMessage());
+ }
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -296,11 +313,11 @@ public class LearnerHandler extends ZooKeeperThread {
@Override
public void run() {
try {
+ leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick
+ leader.self.initLimit + leader.self.syncLimit;
- ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
- .getInputStream()));
+ ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
index e53f6f2..53f516f 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
@@ -19,11 +19,11 @@
package org.apache.zookeeper.server.quorum;
import java.io.IOException;
-import java.net.InetSocketAddress;
import org.apache.jute.Record;
import org.apache.zookeeper.server.ObserverBean;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
@@ -61,10 +61,10 @@ public class Observer extends Learner{
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
- InetSocketAddress addr = findLeader();
- LOG.info("Observing " + addr);
+ QuorumServer leaderServer = findLeader();
+ LOG.info("Observing " + leaderServer.addr);
try {
- connectToLeader(addr);
+ connectToLeader(leaderServer.addr, leaderServer.hostname);
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
syncWithLeader(newLeaderZxid);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 20e5f16..74d1c1e 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.quorum;
+import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -28,20 +29,28 @@ import java.net.SocketException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
+import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.zookeeper.server.ZooKeeperThread;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperThread;
-
/**
* This class implements a connection manager for leader election using TCP. It
* maintains one connection for every pair of servers. The tricky part is to
@@ -89,7 +98,7 @@ public class QuorumCnxManager {
* Negative counter for observer server ids.
*/
- private long observerCounter = -1;
+ private AtomicLong observerCounter = new AtomicLong(-1);
/*
* Connection time out value in milliseconds
@@ -100,7 +109,20 @@ public class QuorumCnxManager {
/*
* Local IP address
*/
- final QuorumPeer self;
+ final long mySid;
+ final int socketTimeout;
+ final Map<Long, QuorumPeer.QuorumServer> view;
+ final boolean listenOnAllIPs;
+ private ThreadPoolExecutor connectionExecutor;
+ private final Set<Long> inprogressConnections = Collections
+ .synchronizedSet(new HashSet<Long>());
+ private QuorumAuthServer authServer;
+ private QuorumAuthLearner authLearner;
+ private boolean quorumSaslAuthEnabled;
+ /*
+ * Counter to count connection processing threads.
+ */
+ private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
/*
* Mapping from Peer to Thread number
@@ -145,7 +167,14 @@ public class QuorumCnxManager {
long sid;
}
- public QuorumCnxManager(QuorumPeer self) {
+ public QuorumCnxManager(final long mySid,
+ Map<Long,QuorumPeer.QuorumServer> view,
+ QuorumAuthServer authServer,
+ QuorumAuthLearner authLearner,
+ int socketTimeout,
+ boolean listenOnAllIPs,
+ int quorumCnxnThreadsSize,
+ boolean quorumSaslAuthEnabled) {
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -155,13 +184,53 @@ public class QuorumCnxManager {
if(cnxToValue != null){
this.cnxTO = new Integer(cnxToValue);
}
-
- this.self = self;
+
+ this.mySid = mySid;
+ this.socketTimeout = socketTimeout;
+ this.view = view;
+ this.listenOnAllIPs = listenOnAllIPs;
+
+ initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
+ quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
listener = new Listener();
}
+ private void initializeAuth(final long mySid,
+ final QuorumAuthServer authServer,
+ final QuorumAuthLearner authLearner,
+ final int quorumCnxnThreadsSize,
+ final boolean quorumSaslAuthEnabled) {
+ this.authServer = authServer;
+ this.authLearner = authLearner;
+ this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
+ if (!this.quorumSaslAuthEnabled) {
+ LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
+ return;
+ }
+
+ // init connection executors
+ final AtomicInteger threadIndex = new AtomicInteger(1);
+ SecurityManager s = System.getSecurityManager();
+ final ThreadGroup group = (s != null) ? s.getThreadGroup()
+ : Thread.currentThread().getThreadGroup();
+ ThreadFactory daemonThFactory = new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, "QuorumConnectionThread-"
+ + "[myid=" + mySid + "]-"
+ + threadIndex.getAndIncrement());
+ return t;
+ }
+ };
+ this.connectionExecutor = new ThreadPoolExecutor(3,
+ quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), daemonThFactory);
+ this.connectionExecutor.allowCoreThreadTimeOut(true);
+ }
+
/**
* Invokes initiateConnection for testing purposes
*
@@ -173,7 +242,8 @@ public class QuorumCnxManager {
}
Socket sock = new Socket();
setSockOpts(sock);
- sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
+ sock.connect(QuorumPeer.viewToVotingView(view).get(sid).electionAddr,
+ cnxTO);
initiateConnection(sock, sid);
}
@@ -181,28 +251,96 @@ public class QuorumCnxManager {
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
- public boolean initiateConnection(Socket sock, Long sid) {
+ public void initiateConnection(final Socket sock, final Long sid) {
+ try {
+ startConnection(sock, sid);
+ } catch (IOException e) {
+ LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
+ new Object[] { sid, sock.getRemoteSocketAddress() }, e);
+ closeSocket(sock);
+ return;
+ }
+ }
+
+ /**
+ * Server will initiate the connection request to its peer server
+ * asynchronously via separate connection thread.
+ */
+ public void initiateConnectionAsync(final Socket sock, final Long sid) {
+ if(!inprogressConnections.add(sid)){
+ // simply return as there is a connection request to
+ // server 'sid' already in progress.
+ LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
+ sid);
+ closeSocket(sock);
+ return;
+ }
+ try {
+ connectionExecutor.execute(
+ new QuorumConnectionReqThread(sock, sid));
+ connectionThreadCnt.incrementAndGet();
+ } catch (Throwable e) {
+ // Imp: Safer side catching all type of exceptions and remove 'sid'
+ // from inprogress connections. This is to avoid blocking further
+ // connection requests from this 'sid' in case of errors.
+ inprogressConnections.remove(sid);
+ LOG.error("Exception while submitting quorum connection request", e);
+ closeSocket(sock);
+ }
+ }
+
+ /**
+ * Thread to send connection request to peer server.
+ */
+ private class QuorumConnectionReqThread extends ZooKeeperThread {
+ final Socket sock;
+ final Long sid;
+ QuorumConnectionReqThread(final Socket sock, final Long sid) {
+ super("QuorumConnectionReqThread-" + sid);
+ this.sock = sock;
+ this.sid = sid;
+ }
+
+ @Override
+ public void run() {
+ try{
+ initiateConnection(sock, sid);
+ } finally {
+ inprogressConnections.remove(sid);
+ }
+ }
+ }
+
+ private boolean startConnection(Socket sock, Long sid)
+ throws IOException {
DataOutputStream dout = null;
+ DataInputStream din = null;
try {
// Sending id and challenge
dout = new DataOutputStream(sock.getOutputStream());
- dout.writeLong(self.getId());
+ dout.writeLong(this.mySid);
dout.flush();
+
+ din = new DataInputStream(
+ new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
-
+
+ // authenticate learner
+ authLearner.authenticate(sock, view.get(sid).hostname);
+
// If lost the challenge, then drop the new connection
- if (sid > self.getId()) {
+ if (sid > this.mySid) {
LOG.info("Have smaller server identifier, so dropping the " +
- "connection: (" + sid + ", " + self.getId() + ")");
+ "connection: (" + sid + ", " + this.mySid + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, sid, sw);
+ RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -225,8 +363,6 @@ public class QuorumCnxManager {
return false;
}
-
-
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
@@ -234,12 +370,57 @@ public class QuorumCnxManager {
* possible long value to lose the challenge.
*
*/
- public void receiveConnection(Socket sock) {
+ public void receiveConnection(final Socket sock) {
+ DataInputStream din = null;
+ try {
+ din = new DataInputStream(
+ new BufferedInputStream(sock.getInputStream()));
+
+ handleConnection(sock, din);
+ } catch (IOException e) {
+ LOG.error("Exception handling connection, addr: {}, closing server connection",
+ sock.getRemoteSocketAddress());
+ closeSocket(sock);
+ }
+ }
+
+ /**
+ * Server receives a connection request and handles it asynchronously via
+ * separate thread.
+ */
+ public void receiveConnectionAsync(final Socket sock) {
+ try {
+ connectionExecutor.execute(
+ new QuorumConnectionReceiverThread(sock));
+ connectionThreadCnt.incrementAndGet();
+ } catch (Throwable e) {
+ LOG.error("Exception handling connection, addr: {}, closing server connection",
+ sock.getRemoteSocketAddress());
+ closeSocket(sock);
+ }
+ }
+
+ /**
+ * Thread to receive connection request from peer server.
+ */
+ private class QuorumConnectionReceiverThread extends ZooKeeperThread {
+ private final Socket sock;
+ QuorumConnectionReceiverThread(final Socket sock) {
+ super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
+ this.sock = sock;
+ }
+
+ @Override
+ public void run() {
+ receiveConnection(sock);
+ }
+ }
+
+ private void handleConnection(Socket sock, DataInputStream din)
+ throws IOException {
Long sid = null;
-
try {
// Read server id
- DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
@@ -265,8 +446,7 @@ public class QuorumCnxManager {
* Choose identifier at random. We need a value to identify
* the connection.
*/
-
- sid = observerCounter--;
+ sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
@@ -274,9 +454,12 @@ public class QuorumCnxManager {
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
-
+
+ // do authenticating learner
+ authServer.authenticate(sock, din);
+
//If wins the challenge, then close the new connection.
- if (sid < self.getId()) {
+ if (sid < this.mySid) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
@@ -297,7 +480,7 @@ public class QuorumCnxManager {
// Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, sid, sw);
+ RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -327,7 +510,7 @@ public class QuorumCnxManager {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
- if (self.getId() == sid) {
+ if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
@@ -361,28 +544,32 @@ public class QuorumCnxManager {
*
* @param sid server id
*/
-
- synchronized void connectOne(long sid){
- if (senderWorkerMap.get(sid) == null){
+ synchronized public void connectOne(long sid){
+ if (!connectedToPeer(sid)){
InetSocketAddress electionAddr;
- if (self.quorumPeers.containsKey(sid)) {
- electionAddr = self.quorumPeers.get(sid).electionAddr;
+ if (view.containsKey(sid)) {
+ electionAddr = view.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening channel to server " + sid);
- }
+ LOG.debug("Opening channel to server " + sid);
Socket sock = new Socket();
setSockOpts(sock);
- sock.connect(self.getView().get(sid).electionAddr, cnxTO);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connected to server " + sid);
+ sock.connect(view.get(sid).electionAddr, cnxTO);
+ LOG.debug("Connected to server " + sid);
+
+ // Sends connection request asynchronously if the quorum
+ // sasl authentication is enabled. This is required because
+ // sasl server authentication process may take few seconds to
+ // finish, this may delay next peer connection requests.
+ if (quorumSaslAuthEnabled) {
+ initiateConnectionAsync(sock, sid);
+ } else {
+ initiateConnection(sock, sid);
}
- initiateConnection(sock, sid);
} catch (UnresolvedAddressException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, also UAE cannot be wrapped cleanly
@@ -392,8 +579,8 @@ public class QuorumCnxManager {
+ " at election address " + electionAddr, e);
// Resolve hostname for this server in case the
// underlying ip address has changed.
- if (self.getView().containsKey(sid)) {
- self.getView().get(sid).recreateSocketAddresses();
+ if (view.containsKey(sid)) {
+ view.get(sid).recreateSocketAddresses();
}
throw e;
} catch (IOException e) {
@@ -403,8 +590,8 @@ public class QuorumCnxManager {
// We can't really tell if the server is actually down or it failed
// to connect to the server because the underlying IP address
// changed. Resolve the hostname again just in case.
- if (self.getView().containsKey(sid)) {
- self.getView().get(sid).recreateSocketAddresses();
+ if (view.containsKey(sid)) {
+ view.get(sid).recreateSocketAddresses();
}
}
} else {
@@ -451,6 +638,13 @@ public class QuorumCnxManager {
listener.halt();
softHalt();
+
+ // clear data structures used for auth
+ if (connectionExecutor != null) {
+ connectionExecutor.shutdown();
+ }
+ inprogressConnections.clear();
+ resetConnectionThreadCount();
}
/**
@@ -471,7 +665,7 @@ public class QuorumCnxManager {
*/
private void setSockOpts(Socket sock) throws SocketException {
sock.setTcpNoDelay(true);
- sock.setSoTimeout(self.tickTime * self.syncLimit);
+ sock.setSoTimeout(socketTimeout);
}
/**
@@ -494,11 +688,19 @@ public class QuorumCnxManager {
public long getThreadCount() {
return threadCnt.get();
}
+
+ /**
+ * Return number of connection processing threads.
+ */
+ public long getConnectionThreadCount() {
+ return connectionThreadCnt.get();
+ }
+
/**
- * Return reference to QuorumPeer
+ * Reset the value of connection processing threads count to zero.
*/
- public QuorumPeer getQuorumPeer() {
- return self;
+ private void resetConnectionThreadCount() {
+ connectionThreadCnt.set(0);
}
/**
@@ -525,22 +727,35 @@ public class QuorumCnxManager {
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
- if (self.getQuorumListenOnAllIPs()) {
- int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
+ if (listenOnAllIPs) {
+ int port = view.get(QuorumCnxManager.this.mySid)
+ .electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
- addr = self.quorumPeers.get(self.getId()).electionAddr;
+ addr = view.get(QuorumCnxManager.this.mySid)
+ .electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
- setName(self.quorumPeers.get(self.getId()).electionAddr
- .toString());
+ setName(view.get(QuorumCnxManager.this.mySid)
+ .electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
- receiveConnection(client);
+
+ // Receive and handle the connection request
+ // asynchronously if the quorum sasl authentication is
+ // enabled. This is required because sasl server
+ // authentication process may take few seconds to finish,
+ // this may delay next peer connection requests.
+ if (quorumSaslAuthEnabled) {
+ receiveConnectionAsync(client);
+ } else {
+ receiveConnection(client);
+ }
+
numRetries = 0;
}
} catch (IOException e) {
@@ -562,7 +777,7 @@ public class QuorumCnxManager {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
- + self.quorumPeers.get(self.getId()).electionAddr);
+ + view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
@@ -573,7 +788,8 @@ public class QuorumCnxManager {
try{
LOG.debug("Trying to close listener: " + ss);
if(ss != null) {
- LOG.debug("Closing listener: " + self.getId());
+ LOG.debug("Closing listener: "
+ + QuorumCnxManager.this.mySid);
ss.close();
}
} catch (IOException e){
@@ -729,8 +945,9 @@ public class QuorumCnxManager {
}
}
} catch (Exception e) {
- LOG.warn("Exception when using channel: for id " + sid + " my id = " +
- self.getId() + " error = " + e);
+ LOG.warn("Exception when using channel: for id " + sid
+ + " my id = " + QuorumCnxManager.this.mySid
+ + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
@@ -745,16 +962,16 @@ public class QuorumCnxManager {
Long sid;
Socket sock;
volatile boolean running = true;
- DataInputStream din;
+ final DataInputStream din;
final SendWorker sw;
- RecvWorker(Socket sock, Long sid, SendWorker sw) {
+ RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
super("RecvWorker:" + sid);
this.sid = sid;
this.sock = sock;
this.sw = sw;
+ this.din = din;
try {
- din = new DataInputStream(sock.getInputStream());
// OK to wait until socket disconnects while reading.
sock.setSoTimeout(0);
} catch (IOException e) {
@@ -807,8 +1024,8 @@ public class QuorumCnxManager {
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
- LOG.warn("Connection broken for id " + sid + ", my id = " +
- self.getId() + ", error = " , e);
+ LOG.warn("Connection broken for id " + sid + ", my id = "
+ + QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
@@ -930,4 +1147,8 @@ public class QuorumCnxManager {
throws InterruptedException {
return recvQueue.poll(timeout, unit);
}
+
+ public boolean connectedToPeer(long peerSid) {
+ return senderWorkerMap.get(peerSid) != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 2f0f21b..2dbedcf 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -34,8 +34,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import javax.security.sasl.SaslException;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.jmx.MBeanRegistry;
@@ -45,6 +49,13 @@ import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -85,6 +96,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
QuorumCnxManager qcm;
+ QuorumAuthServer authServer;
+ QuorumAuthLearner authLearner;
/* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
@@ -102,7 +115,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
this.electionAddr = electionAddr;
}
- private QuorumServer(long id, InetSocketAddress addr) {
+ // VisibleForTesting
+ public QuorumServer(long id, InetSocketAddress addr) {
this.id = id;
this.addr = addr;
this.electionAddr = null;
@@ -338,6 +352,50 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
protected boolean quorumListenOnAllIPs = false;
/**
+ * Enable/Disables quorum authentication using sasl. Defaulting to false.
+ */
+ protected boolean quorumSaslEnableAuth;
+
+ /**
+ * If this is false, quorum peer server will accept another quorum peer client
+ * connection even if the authentication did not succeed. This can be used while
+ * upgrading ZooKeeper server. Defaulting to false (required).
+ */
+ protected boolean quorumServerSaslAuthRequired;
+
+ /**
+ * If this is false, quorum peer learner will talk to quorum peer server
+ * without authentication. This can be used while upgrading ZooKeeper
+ * server. Defaulting to false (required).
+ */
+ protected boolean quorumLearnerSaslAuthRequired;
+
+ /**
+ * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
+ */
+ protected String quorumServicePrincipal;
+
+ /**
+ * Quorum learner login context name in jaas-conf file to read the kerberos
+ * security details. Defaulting to 'QuorumLearner'.
+ */
+ protected String quorumLearnerLoginContext;
+
+ /**
+ * Quorum server login context name in jaas-conf file to read the kerberos
+ * security details. Defaulting to 'QuorumServer'.
+ */
+ protected String quorumServerLoginContext;
+
+ // TODO: need to tune the default value of thread size
+ private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
+ /**
+ * The maximum number of threads to allow in the connectionExecutors thread
+ * pool which will be used to initiate quorum server connections.
+ */
+ protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
+
+ /**
* @deprecated As of release 3.4.0, this class has been deprecated, since
* it is used with one of the udp-based versions of leader election, which
* we are also deprecating.
@@ -449,10 +507,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
private FileTxnSnapLog logFactory = null;
private final QuorumStats quorumStats;
-
- public QuorumPeer() {
+
+ public static QuorumPeer testingQuorumPeer() throws SaslException {
+ return new QuorumPeer();
+ }
+
+ private QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
+ initialize();
}
@@ -490,7 +553,24 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
else this.quorumConfig = quorumConfig;
}
-
+
+ public void initialize() throws SaslException {
+ // init quorum auth server & learner
+ if (isQuorumSaslAuthEnabled()) {
+ Set<String> authzHosts = new HashSet<String>();
+ for (QuorumServer qs : getView().values()) {
+ authzHosts.add(qs.hostname);
+ }
+ authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(),
+ quorumServerLoginContext, authzHosts);
+ authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(),
+ quorumServicePrincipal, quorumLearnerLoginContext);
+ } else {
+ authServer = new NullQuorumAuthServer();
+ authLearner = new NullQuorumAuthLearner();
+ }
+ }
+
QuorumStats quorumStats() {
return quorumStats;
}
@@ -686,7 +766,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
le = new AuthFastLeaderElection(this, true);
break;
case 3:
- qcm = new QuorumCnxManager(this);
+ qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
@@ -903,33 +983,37 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
zkDb.close();
} catch (IOException ie) {
LOG.warn("Error closing logs ", ie);
- }
+ }
}
/**
* A 'view' is a node's current opinion of the membership of the entire
- * ensemble.
+ * ensemble.
*/
public Map<Long,QuorumPeer.QuorumServer> getView() {
return Collections.unmodifiableMap(this.quorumPeers);
}
-
+
/**
* Observers are not contained in this view, only nodes with
- * PeerType=PARTICIPANT.
+ * PeerType=PARTICIPANT.
*/
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
- Map<Long,QuorumPeer.QuorumServer> ret =
+ return QuorumPeer.viewToVotingView(getView());
+ }
+
+ static Map<Long,QuorumPeer.QuorumServer> viewToVotingView(
+ Map<Long,QuorumPeer.QuorumServer> view) {
+ Map<Long,QuorumPeer.QuorumServer> ret =
new HashMap<Long, QuorumPeer.QuorumServer>();
- Map<Long,QuorumPeer.QuorumServer> view = getView();
- for (QuorumServer server : view.values()) {
+ for (QuorumServer server : view.values()) {
if (server.type == LearnerType.PARTICIPANT) {
ret.put(server.id, server);
}
- }
+ }
return ret;
}
-
+
/**
* Returns only observers, no followers.
*/
@@ -1306,4 +1390,73 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
}
}
+ void setQuorumServerSaslRequired(boolean serverSaslRequired) {
+ quorumServerSaslAuthRequired = serverSaslRequired;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED,
+ serverSaslRequired);
+ }
+
+ void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
+ quorumLearnerSaslAuthRequired = learnerSaslRequired;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED,
+ learnerSaslRequired);
+ }
+
+ void setQuorumSaslEnabled(boolean enableAuth) {
+ quorumSaslEnableAuth = enableAuth;
+ if (!quorumSaslEnableAuth) {
+ LOG.info("QuorumPeer communication is not secured!");
+ } else {
+ LOG.info("{} set to {}",
+ QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
+ }
+ }
+
+ void setQuorumServicePrincipal(String servicePrincipal) {
+ quorumServicePrincipal = servicePrincipal;
+ LOG.info("{} set to {}",QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL,
+ quorumServicePrincipal);
+ }
+
+ void setQuorumLearnerLoginContext(String learnerContext) {
+ quorumLearnerLoginContext = learnerContext;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
+ quorumLearnerLoginContext);
+ }
+
+ void setQuorumServerLoginContext(String serverContext) {
+ quorumServerLoginContext = serverContext;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT,
+ quorumServerLoginContext);
+ }
+
+ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
+ if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
+ quorumCnxnThreadsSize = qCnxnThreadsSize;
+ }
+ LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
+ }
+
+ boolean isQuorumSaslAuthEnabled() {
+ return quorumSaslEnableAuth;
+ }
+
+ private boolean isQuorumServerSaslAuthRequired() {
+ return quorumServerSaslAuthRequired;
+ }
+
+ private boolean isQuorumLearnerSaslAuthRequired() {
+ return quorumLearnerSaslAuthRequired;
+ }
+
+ public QuorumCnxManager createCnxnManager() {
+ return new QuorumCnxManager(this.getId(),
+ this.getView(),
+ this.authServer,
+ this.authLearner,
+ this.tickTime * this.syncLimit,
+ this.getQuorumListenOnAllIPs(),
+ this.quorumCnxnThreadsSize,
+ this.isQuorumSaslAuthEnabled());
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/8a06bd1c/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 0924ef6..621f830 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -38,6 +38,7 @@ import org.slf4j.MDC;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -75,7 +76,16 @@ public class QuorumPeerConfig {
protected boolean syncEnabled = true;
protected LearnerType peerType = LearnerType.PARTICIPANT;
-
+
+ /** Configurations for the quorumpeer-to-quorumpeer sasl authentication */
+ protected boolean quorumServerRequireSasl = false;
+ protected boolean quorumLearnerRequireSasl = false;
+ protected boolean quorumEnableSasl = false;
+ protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
+ protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+ protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
+ protected int quorumCnxnThreadsSize;
+
/**
* Minimum snapshot retain count.
* @see org.apache.zookeeper.server.PurgeTxnLog#purge(File, File, int)
@@ -246,11 +256,45 @@ public class QuorumPeerConfig {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
serverWeight.put(sid, Long.parseLong(value));
+ } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
+ quorumEnableSasl = Boolean.parseBoolean(value);
+ } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
+ quorumServerRequireSasl = Boolean.parseBoolean(value);
+ } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
+ quorumLearnerRequireSasl = Boolean.parseBoolean(value);
+ } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
+ quorumLearnerLoginContext = value;
+ } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
+ quorumServerLoginContext = value;
+ } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
+ quorumServicePrincipal = value;
+ } else if (key.equals("quorum.cnxn.threads.size")) {
+ quorumCnxnThreadsSize = Integer.parseInt(value);
} else {
System.setProperty("zookeeper." + key, value);
}
}
-
+ if (!quorumEnableSasl && quorumServerRequireSasl) {
+ throw new IllegalArgumentException(
+ QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+ + " is disabled, so cannot enable "
+ + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+ }
+ if (!quorumEnableSasl && quorumLearnerRequireSasl) {
+ throw new IllegalArgumentException(
+ QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+ + " is disabled, so cannot enable "
+ + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
+ }
+ // If quorumpeer learner is not auth enabled then self won't be able to
+ // join quorum. So this condition is ensuring that the quorumpeer learner
+ // is also auth enabled while enabling quorum server require sasl.
+ if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
+ throw new IllegalArgumentException(
+ QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
+ + " is disabled, so cannot enable "
+ + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
+ }
// Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
// PurgeTxnLog.purge(File, File, int) will not allow to purge less
// than 3.