You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/28 18:48:47 UTC
[2/2] kafka git commit: KAFKA-2675; SASL/Kerberos follow up
KAFKA-2675; SASL/Kerberos follow up
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Sriharsha Chintalapani <sc...@hortonworks.com>, Jun Rao <ju...@gmail.com>
Closes #376 from ijuma/KAFKA-2675-sasl-kerberos-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9855bb9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9855bb9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9855bb9c
Branch: refs/heads/trunk
Commit: 9855bb9c655ebad0ed5586175452056466189b8a
Parents: b6fe164
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Oct 28 10:48:39 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 28 10:48:39 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 2 -
.../kafka/clients/producer/ProducerConfig.java | 2 -
.../apache/kafka/common/config/SaslConfigs.java | 24 +-
.../common/network/SaslChannelBuilder.java | 2 +-
.../authenticator/SaslClientAuthenticator.java | 21 +-
.../authenticator/SaslServerAuthenticator.java | 10 +-
.../common/security/kerberos/KerberosName.java | 8 +-
.../security/kerberos/KerberosNameParser.java | 12 +-
.../kafka/common/security/kerberos/Login.java | 16 +-
.../common/security/kerberos/LoginManager.java | 12 +
.../distributed/DistributedHerderConfig.java | 3 +-
.../controller/ControllerChannelManager.scala | 21 +-
.../main/scala/kafka/server/KafkaConfig.scala | 10 +-
.../kafka/api/BaseConsumerTest.scala | 357 ++-----------------
.../kafka/api/BaseProducerSendTest.scala | 151 ++++----
.../kafka/api/IntegrationTestHarness.scala | 10 +-
.../kafka/api/PlaintextConsumerTest.scala | 302 +++++++++++++++-
.../integration/kafka/api/QuotasTest.scala | 3 +-
.../integration/kafka/api/SaslTestHarness.scala | 44 ++-
.../integration/KafkaServerTestHarness.scala | 2 -
.../unit/kafka/server/KafkaConfigTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 31 +-
22 files changed, 531 insertions(+), 514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 14c54c2..b366efd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -304,11 +304,9 @@ public class ConsumerConfig extends AbstractConfig {
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
- .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
- .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6d40b77..e9d9fad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -282,11 +282,9 @@ public class ProducerConfig extends AbstractConfig {
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
- .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
- .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 0abefe7..657c6d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -21,34 +21,32 @@ public class SaslConfigs {
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/
- public static final String SASL_KAFKA_SERVER_REALM = "sasl.kafka.server.realm";
- public static final String SASL_KAFKA_SERVER_DOC = "The sasl kafka server realm. "
- + "Default will be from kafka jaas config";
-
public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
- + "This can be defined either in the JAAS config or in the Kakfa config.";
+ + "This can be defined either in Kafka's JAAS config or in Kafka's config.";
public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
- + "Default will be /usr/bin/kinit";
+ + "Default is /usr/bin/kinit";
public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
- public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "LoginThread will sleep until specified window factor of time from last refresh"
- + " to ticket's expiry has been reached, at which time it will wake and try to renew the ticket.";
+ public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "Login thread will sleep until the specified window factor of time from last refresh"
+ + " to ticket's expiry has been reached, at which time it will try to renew the ticket.";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;
public static final String SASL_KERBEROS_TICKET_RENEW_JITTER = "sasl.kerberos.ticket.renew.jitter";
- public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time";
+ public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time.";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;
public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = "sasl.kerberos.min.time.before.relogin";
- public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "LoginThread sleep time between refresh attempts";
+ public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "Login thread sleep time between refresh attempts.";
public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
- public static final String AUTH_TO_LOCAL = "kafka.security.auth.to.local";
- public static final String AUTH_TO_LOCAL_DOC = "Rules for the mapping between principal names and operating system user names";
- public static final List<String> DEFAULT_AUTH_TO_LOCAL = Collections.singletonList("DEFAULT");
+ public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = "sasl.kerberos.principal.to.local.rules";
+ public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal names to short names (typically operating system usernames). " +
+ "The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. " +
+ "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
+ public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 148e549..4d52738 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -66,7 +66,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
} catch (Exception ke) {
defaultRealm = "";
}
- kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
+ kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES));
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
this.sslFactory = new SslFactory(mode);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 3929160..654be14 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -109,8 +109,8 @@ public class SaslClientAuthenticator implements Authenticator {
new ClientCallbackHandler());
}
});
- } catch (Exception e) {
- throw new KafkaException("Failed to create SASL client", e);
+ } catch (PrivilegedActionException e) {
+ throw new KafkaException("Failed to create SaslClient", e.getCause());
}
}
@@ -160,9 +160,9 @@ public class SaslClientAuthenticator implements Authenticator {
netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken));
flushNetOutBufferAndUpdateInterestOps();
}
- } catch (SaslException se) {
+ } catch (IOException e) {
saslState = SaslState.FAILED;
- throw new IOException("Failed to authenticate using SASL " + se);
+ throw e;
}
}
}
@@ -189,9 +189,8 @@ public class SaslClientAuthenticator implements Authenticator {
}
private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
- if (saslToken == null) {
- throw new SaslException("Error in authenticating with a Kafka Broker: the kafka broker saslToken is null.");
- }
+ if (saslToken == null)
+ throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
@@ -200,8 +199,7 @@ public class SaslClientAuthenticator implements Authenticator {
}
});
} catch (PrivilegedActionException e) {
- String error = "An error: (" + e + ") occurred when evaluating Kafka Brokers " +
- " received SASL token.";
+ String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
// Try to provide hints to use about what went wrong so they can fix their configuration.
// TODO: introspect about e: look for GSS information.
final String unknownServerErrorText =
@@ -214,7 +212,8 @@ public class SaslClientAuthenticator implements Authenticator {
" `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
}
error += " Kafka Client will go to AUTH_FAILED state.";
- throw new SaslException(error);
+ //Unwrap the SaslException inside `PrivilegedActionException`
+ throw new SaslException(error, e.getCause());
}
}
@@ -234,7 +233,7 @@ public class SaslClientAuthenticator implements Authenticator {
nc.setName(nc.getDefaultName());
} else if (callback instanceof PasswordCallback) {
// Call `setPassword` once we support obtaining a password from the user and update message below
- LOG.warn("Could not login: the client is being asked for a password, but the Kafka" +
+ throw new UnsupportedCallbackException(callback, "Could not login: the client is being asked for a password, but the Kafka" +
" client code does not currently support obtaining a password from the user." +
" Make sure -Djava.security.auth.login.config property passed to JVM and" +
" the client is configured to use a ticket cache (using" +
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index d06a22a..4104668 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -124,16 +124,12 @@ public class SaslServerAuthenticator implements Authenticator {
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
- public SaslServer run() {
- try {
- return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
- } catch (SaslException e) {
- throw new KafkaException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e);
- }
+ public SaslServer run() throws SaslException {
+ return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
}
});
} catch (PrivilegedActionException e) {
- throw new KafkaException("Kafka Broker experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context", e);
+ throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
index aef10db..46f0edf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
@@ -31,18 +31,18 @@ public class KerberosName {
private final String realm;
/* Rules for the translation of the principal name into an operating system name */
- private final List<KerberosRule> authToLocalRules;
+ private final List<KerberosRule> principalToLocalRules;
/**
* Creates an instance of `KerberosName` with the provided parameters.
*/
- public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> authToLocalRules) {
+ public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> principalToLocalRules) {
if (serviceName == null)
throw new IllegalArgumentException("serviceName must not be null");
this.serviceName = serviceName;
this.hostName = hostName;
this.realm = realm;
- this.authToLocalRules = authToLocalRules;
+ this.principalToLocalRules = principalToLocalRules;
}
/**
@@ -103,7 +103,7 @@ public class KerberosName {
} else {
params = new String[]{realm, serviceName, hostName};
}
- for (KerberosRule r : authToLocalRules) {
+ for (KerberosRule r : principalToLocalRules) {
String result = r.apply(params);
if (result != null)
return result;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
index 95eb170..eb4e6f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.security.kerberos;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -44,10 +45,11 @@ public class KerberosNameParser {
/**
* The list of translation rules.
*/
- private final List<KerberosRule> authToLocalRules;
+ private final List<KerberosRule> principalToLocalRules;
- public KerberosNameParser(String defaultRealm, List<String> authToLocalRules) {
- this.authToLocalRules = parseRules(defaultRealm, authToLocalRules);
+ public KerberosNameParser(String defaultRealm, List<String> principalToLocalRules) {
+ List<String> rules = principalToLocalRules == null ? Collections.singletonList("DEFAULT") : principalToLocalRules;
+ this.principalToLocalRules = parseRules(defaultRealm, rules);
}
/**
@@ -59,10 +61,10 @@ public class KerberosNameParser {
if (principalName.contains("@")) {
throw new IllegalArgumentException("Malformed Kerberos name: " + principalName);
} else {
- return new KerberosName(principalName, null, null, authToLocalRules);
+ return new KerberosName(principalName, null, null, principalToLocalRules);
}
} else {
- return new KerberosName(match.group(1), match.group(3), match.group(4), authToLocalRules);
+ return new KerberosName(match.group(1), match.group(3), match.group(4), principalToLocalRules);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
index dd885e5..470ab96 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
@@ -258,7 +258,7 @@ public class Login {
}
}
}
- }, false);
+ }, true);
}
public void startThreadIfNeeded() {
@@ -288,20 +288,10 @@ public class Login {
if (jaasConfigFile == null) {
throw new IllegalArgumentException("You must pass " + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + " in secure mode.");
}
-
AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
if (configEntries == null) {
- // Forcing a reload of the configuration in case it's been overridden by third-party code.
- // Without this, our tests fail sometimes depending on the order the tests are executed.
- // Singletons are bad.
- Configuration.setConfiguration(null);
- configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
- if (configEntries == null) {
- String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`.";
- throw new IllegalArgumentException(errorMessage);
- } else {
- log.info("Found `" + loginContextName + "` in JAAS configuration after forced reload.");
- }
+ String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`.";
+ throw new IllegalArgumentException(errorMessage);
}
LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
index 18651c8..ac31f9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
@@ -21,6 +21,7 @@ package org.apache.kafka.common.security.kerberos;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.EnumMap;
import java.util.Map;
@@ -115,4 +116,15 @@ public class LoginManager {
--refCount;
}
}
+
+ /* Should only be used in tests. */
+ public static void closeAll() {
+ synchronized (LoginManager.class) {
+ for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) {
+ LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
+ loginManager.login.shutdown();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
index 1e09e8b..86c4d1e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -156,11 +156,10 @@ public class DistributedHerderConfig extends AbstractConfig {
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
- .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
- .define(SaslConfigs.AUTH_TO_LOCAL, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, ConfigDef.Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
+ .define(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
.define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
40 * 1000,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6b24c29..e52a9d3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -112,12 +112,13 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
- case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name,config.brokerId, broker.id)
+ case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}
- val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time, threadName)
+ val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
+ brokerNode, config, time, threadName)
requestThread.setDaemon(false)
- brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread))
+ brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
@@ -125,7 +126,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
- brokerStateInfo.remove(brokerState.broker.id)
+ brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
@@ -142,7 +143,6 @@ case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: Abstra
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
- val toBroker: Broker,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
@@ -186,7 +186,7 @@ class RequestSendThread(val controllerId: Int,
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
- request.toString, toBroker.toString()), e)
+ request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
isSendSuccessful = false
backoff()
@@ -200,7 +200,7 @@ class RequestSendThread(val controllerId: Int,
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
- .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
+ .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
if (callback != null) {
callback(response)
@@ -209,7 +209,7 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable =>
- error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
+ error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
@@ -227,12 +227,12 @@ class RequestSendThread(val controllerId: Int,
if (!ready)
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
- info("Controller %d connected to %s for sending state change requests".format(controllerId, toBroker.toString()))
+ info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
true
}
} catch {
case e: Throwable =>
- warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, toBroker.toString()), e)
+ warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
false
}
@@ -439,7 +439,6 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
- broker: Broker,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7aba1c9..b749446 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -173,7 +173,7 @@ object Defaults {
val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
- val AuthToLocal = SaslConfigs.DEFAULT_AUTH_TO_LOCAL
+ val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
}
@@ -328,7 +328,7 @@ object KafkaConfig {
val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER
val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
- val AuthToLocalProp = SaslConfigs.AUTH_TO_LOCAL
+ val SaslKerberosPrincipalToLocalRulesProp = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
/* Documentation */
/** ********* Zookeeper Configuration ***********/
@@ -503,7 +503,7 @@ object KafkaConfig {
val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC
val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC
val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
- val AuthToLocalDoc = SaslConfigs.AUTH_TO_LOCAL_DOC
+ val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
private val configDef = {
import ConfigDef.Importance._
@@ -667,7 +667,7 @@ object KafkaConfig {
.define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc)
.define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
.define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
- .define(AuthToLocalProp, LIST, Defaults.AuthToLocal, MEDIUM, AuthToLocalDoc)
+ .define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
}
@@ -836,7 +836,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
- val authToLocal = getList(KafkaConfig.AuthToLocalProp)
+ val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index f99f0d8..db4295c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -12,13 +12,12 @@
*/
package kafka.api
-import java.util.regex.Pattern
import java.util
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{TestUtils, Logging}
import kafka.server.KafkaConfig
@@ -27,7 +26,6 @@ import java.util.ArrayList
import org.junit.Assert._
import org.junit.{Test, Before}
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import kafka.coordinator.GroupCoordinator
@@ -71,7 +69,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
sendRecords(numRecords)
assertEquals(0, this.consumers(0).assignment.size)
- this.consumers(0).assign(List(tp))
+ this.consumers(0).assign(List(tp).asJava)
assertEquals(1, this.consumers(0).assignment.size)
this.consumers(0).seek(tp, 0)
@@ -88,62 +86,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
@Test
- def testAutoCommitOnClose() {
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-
- val numRecords = 10000
- sendRecords(numRecords)
-
- consumer0.subscribe(List(topic))
-
- val assignment = Set(tp, tp2)
- TestUtils.waitUntilTrue(() => {
- consumer0.poll(50)
- consumer0.assignment() == assignment.asJava
- }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
-
- // should auto-commit seeked positions before closing
- consumer0.seek(tp, 300)
- consumer0.seek(tp2, 500)
- consumer0.close()
-
- // now we should see the committed positions from another consumer
- assertEquals(300, this.consumers(0).committed(tp).offset)
- assertEquals(500, this.consumers(0).committed(tp2).offset)
- }
-
- @Test
- def testAutoCommitOnCloseAfterWakeup() {
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
- val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-
- val numRecords = 10000
- sendRecords(numRecords)
-
- consumer0.subscribe(List(topic))
-
- val assignment = Set(tp, tp2)
- TestUtils.waitUntilTrue(() => {
- consumer0.poll(50)
- consumer0.assignment() == assignment.asJava
- }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
-
- // should auto-commit seeked positions before closing
- consumer0.seek(tp, 300)
- consumer0.seek(tp2, 500)
-
- // wakeup the consumer before closing to simulate trying to break a poll
- // loop from another thread
- consumer0.wakeup()
- consumer0.close()
-
- // now we should see the committed positions from another consumer
- assertEquals(300, this.consumers(0).committed(tp).offset)
- assertEquals(500, this.consumers(0).committed(tp2).offset)
- }
-
- @Test
def testAutoCommitOnRebalance() {
val topic2 = "topic2"
TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
@@ -157,13 +99,13 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
val rebalanceListener = new ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
// keep partitions paused in this test so that we can verify the commits based on specific seeks
- partitions.foreach(consumer0.pause(_))
+ partitions.asScala.foreach(consumer0.pause(_))
}
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
}
- consumer0.subscribe(List(topic), rebalanceListener)
+ consumer0.subscribe(List(topic).asJava, rebalanceListener)
val assignment = Set(tp, tp2)
TestUtils.waitUntilTrue(() => {
@@ -175,7 +117,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
consumer0.seek(tp2, 500)
// change subscription to trigger rebalance
- consumer0.subscribe(List(topic, topic2), rebalanceListener)
+ consumer0.subscribe(List(topic, topic2).asJava, rebalanceListener)
val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
TestUtils.waitUntilTrue(() => {
@@ -189,97 +131,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
@Test
- def testPatternSubscription() {
- val numRecords = 10000
- sendRecords(numRecords)
-
- val topic1: String = "tblablac" // matches subscribed pattern
- TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
- sendRecords(1000, new TopicPartition(topic1, 0))
- sendRecords(1000, new TopicPartition(topic1, 1))
-
- val topic2: String = "tblablak" // does not match subscribed pattern
- TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
- sendRecords(1000, new TopicPartition(topic2, 0))
- sendRecords(1000, new TopicPartition(topic2, 1))
-
- val topic3: String = "tblab1" // does not match subscribed pattern
- TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
- sendRecords(1000, new TopicPartition(topic3, 0))
- sendRecords(1000, new TopicPartition(topic3, 1))
-
- assertEquals(0, this.consumers(0).assignment().size)
-
- val pattern: Pattern = Pattern.compile("t.*c")
- this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
- this.consumers(0).poll(50)
-
- var subscriptions = Set(
- new TopicPartition(topic, 0),
- new TopicPartition(topic, 1),
- new TopicPartition(topic1, 0),
- new TopicPartition(topic1, 1))
-
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
- val topic4: String = "tsomec" // matches subscribed pattern
- TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
- sendRecords(1000, new TopicPartition(topic4, 0))
- sendRecords(1000, new TopicPartition(topic4, 1))
-
- subscriptions = subscriptions ++ Set(
- new TopicPartition(topic4, 0),
- new TopicPartition(topic4, 1))
-
-
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
- this.consumers(0).unsubscribe()
- assertEquals(0, this.consumers(0).assignment().size)
- }
-
- @Test
- def testPatternUnsubscription() {
- val numRecords = 10000
- sendRecords(numRecords)
-
- val topic1: String = "tblablac" // matches subscribed pattern
- TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
- sendRecords(1000, new TopicPartition(topic1, 0))
- sendRecords(1000, new TopicPartition(topic1, 1))
-
- assertEquals(0, this.consumers(0).assignment().size)
-
- this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
- this.consumers(0).poll(50)
-
- val subscriptions = Set(
- new TopicPartition(topic, 0),
- new TopicPartition(topic, 1),
- new TopicPartition(topic1, 0),
- new TopicPartition(topic1, 1))
-
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
- this.consumers(0).unsubscribe()
- assertEquals(0, this.consumers(0).assignment().size)
- }
-
- @Test
def testCommitSpecifiedOffsets() {
sendRecords(5, tp)
sendRecords(7, tp2)
- this.consumers(0).assign(List(tp, tp2));
+ this.consumers(0).assign(List(tp, tp2).asJava)
// Need to poll to join the group
this.consumers(0).poll(50)
@@ -304,106 +160,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
@Test
- def testAutoOffsetReset() {
- sendRecords(1)
- this.consumers(0).assign(List(tp))
- consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
- }
-
- @Test
- def testSeek() {
- val consumer = this.consumers(0)
- val totalRecords = 50L
- sendRecords(totalRecords.toInt)
- consumer.assign(List(tp))
-
- consumer.seekToEnd(tp)
- assertEquals(totalRecords, consumer.position(tp))
- assertFalse(consumer.poll(totalRecords).iterator().hasNext)
-
- consumer.seekToBeginning(tp)
- assertEquals(0, consumer.position(tp), 0)
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
-
- val mid = totalRecords / 2
- consumer.seek(tp, mid)
- assertEquals(mid, consumer.position(tp))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
- }
-
- @Test
- def testGroupConsumption() {
- sendRecords(10)
- this.consumers(0).subscribe(List(topic))
- consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
- }
-
-
- @Test
- def testCommitMetadata() {
- this.consumers(0).assign(List(tp))
-
- // sync commit
- val syncMetadata = new OffsetAndMetadata(5, "foo")
- this.consumers(0).commitSync(Map((tp, syncMetadata)))
- assertEquals(syncMetadata, this.consumers(0).committed(tp))
-
- // async commit
- val asyncMetadata = new OffsetAndMetadata(10, "bar")
- val callback = new CountConsumerCommitCallback
- this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback)
- awaitCommitCallback(this.consumers(0), callback)
-
- assertEquals(asyncMetadata, this.consumers(0).committed(tp))
- }
-
- def testPositionAndCommit() {
- sendRecords(5)
-
- // committed() on a partition with no committed offset throws an exception
- intercept[NoOffsetForPartitionException] {
- this.consumers(0).committed(new TopicPartition(topic, 15))
- }
-
- // position() on a partition that we aren't subscribed to throws an exception
- intercept[IllegalArgumentException] {
- this.consumers(0).position(new TopicPartition(topic, 15))
- }
-
- this.consumers(0).assign(List(tp))
-
- assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
- this.consumers(0).commitSync()
- assertEquals(0L, this.consumers(0).committed(tp).offset)
-
- consumeAndVerifyRecords(this.consumers(0), 5, 0)
- assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
- this.consumers(0).commitSync()
- assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
-
- sendRecords(1)
-
- // another consumer in the same group should get the same position
- this.consumers(1).assign(List(tp))
- consumeAndVerifyRecords(this.consumers(1), 1, 5)
- }
-
- @Test
- def testPartitionsFor() {
- val numParts = 2
- TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
- val parts = this.consumers(0).partitionsFor("part-test")
- assertNotNull(parts)
- assertEquals(2, parts.size)
- assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
- }
-
- @Test
def testListTopics() {
val numParts = 2
- val topic1: String = "part-test-topic-1"
- val topic2: String = "part-test-topic-2"
- val topic3: String = "part-test-topic-3"
+ val topic1 = "part-test-topic-1"
+ val topic2 = "part-test-topic-2"
+ val topic3 = "part-test-topic-3"
TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers)
TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
@@ -420,28 +181,28 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
@Test
def testPartitionReassignmentCallback() {
val listener = new TestConsumerReassignmentListener()
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
- this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+ this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
- consumer0.subscribe(List(topic), listener)
-
+ consumer0.subscribe(List(topic).asJava, listener)
+
// the initial subscription should cause a callback execution
- while(listener.callsToAssigned == 0)
+ while (listener.callsToAssigned == 0)
consumer0.poll(50)
-
+
// get metadata for the topic
- var parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
- while(parts == null)
+ var parts: Seq[PartitionInfo] = null
+ while (parts == null)
parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
-
+
// shutdown the coordinator
val coordinator = parts(0).leader().id()
this.servers(coordinator).shutdown()
-
+
// this should cause another callback execution
- while(listener.callsToAssigned < 2)
+ while (listener.callsToAssigned < 2)
consumer0.poll(50)
assertEquals(2, listener.callsToAssigned)
@@ -455,19 +216,19 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
@Test
def testUnsubscribeTopic() {
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
- this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+ this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
try {
val listener = new TestConsumerReassignmentListener()
- consumer0.subscribe(List(topic), listener)
+ consumer0.subscribe(List(topic).asJava, listener)
// the initial subscription should cause a callback execution
while (listener.callsToAssigned == 0)
consumer0.poll(50)
- consumer0.subscribe(List())
+ consumer0.subscribe(List[String]().asJava)
assertEquals(0, consumer0.assignment.size())
} finally {
consumer0.close()
@@ -475,75 +236,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
@Test
- def testExpandingTopicSubscriptions() {
- val otherTopic = "other"
- val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
- val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
- this.consumers(0).subscribe(List(topic))
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-
- TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
- this.consumers(0).subscribe(List(topic, otherTopic))
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == expandedSubscriptions.asJava
- }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
- }
-
- @Test
- def testShrinkingTopicSubscriptions() {
- val otherTopic = "other"
- TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
- val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
- val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
- this.consumers(0).subscribe(List(topic, otherTopic))
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-
- this.consumers(0).subscribe(List(topic))
- TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == shrunkenSubscriptions.asJava
- }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
- }
-
- @Test
- def testPartitionPauseAndResume() {
- sendRecords(5)
- this.consumers(0).assign(List(tp))
- consumeAndVerifyRecords(this.consumers(0), 5, 0)
- this.consumers(0).pause(tp)
- sendRecords(5)
- assertTrue(this.consumers(0).poll(0).isEmpty)
- this.consumers(0).resume(tp)
- consumeAndVerifyRecords(this.consumers(0), 5, 5)
- }
-
- @Test
def testPauseStateNotPreservedByRebalance() {
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
- this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+ this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
sendRecords(5)
- consumer0.subscribe(List(topic))
+ consumer0.subscribe(List(topic).asJava)
consumeAndVerifyRecords(consumer0, 5, 0)
consumer0.pause(tp)
// subscribe to a new topic to trigger a rebalance
- consumer0.subscribe(List("topic2"))
+ consumer0.subscribe(List("topic2").asJava)
// after rebalance, our position should be reset and our pause state lost,
// so we should be able to consume from the beginning
consumeAndVerifyRecords(consumer0, 0, 5)
}
- private class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
+ protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
var callsToAssigned = 0
var callsToRevoked = 0
def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
@@ -556,17 +267,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
}
- private def sendRecords(numRecords: Int): Unit = {
+ protected def sendRecords(numRecords: Int): Unit = {
sendRecords(numRecords, tp)
}
- private def sendRecords(numRecords: Int, tp: TopicPartition) {
+ protected def sendRecords(numRecords: Int, tp: TopicPartition) {
(0 until numRecords).map { i =>
this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
}.foreach(_.get)
}
- private def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
+ protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
startingKeyAndValueIndex: Int = 0) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
val maxIters = numRecords * 300
@@ -590,7 +301,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
}
- private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+ protected def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
val startCount = commitCallback.count
val started = System.currentTimeMillis()
while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
@@ -598,7 +309,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(startCount + 1, commitCallback.count)
}
- private class CountConsumerCommitCallback extends OffsetCommitCallback {
+ protected class CountConsumerCommitCallback extends OffsetCommitCallback {
var count = 0
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 92c93e6..29291d4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -131,8 +131,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
} finally {
- if (producer != null)
- producer.close()
+ producer.close()
}
}
@@ -184,8 +183,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
} finally {
- if (producer != null)
- producer.close()
+ producer.close()
}
}
@@ -237,8 +235,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals(i.toLong, messageSet1(i).offset)
}
} finally {
- if (producer != null)
- producer.close()
+ producer.close()
}
}
@@ -260,9 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
} finally {
- if (producer != null) {
- producer.close()
- }
+ producer.close()
}
}
@@ -282,8 +277,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertTrue("All requests are complete.", responses.forall(_.isDone()))
}
} finally {
- if (producer != null)
- producer.close()
+ producer.close()
}
}
@@ -292,42 +286,35 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testCloseWithZeroTimeoutFromCallerThread() {
- var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
- try {
- // create topic
- val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
- val leader0 = leaders(0)
- val leader1 = leaders(1)
-
- // create record
- val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
- val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
-
- // Test closing from caller thread.
- for(i <- 0 until 50) {
- producer = createProducer(brokerList, lingerMs = Long.MaxValue)
- val responses = (0 until numRecords) map (i => producer.send(record0))
- assertTrue("No request is complete.", responses.forall(!_.isDone()))
- producer.close(0, TimeUnit.MILLISECONDS)
- responses.foreach { future =>
- try {
- future.get()
- fail("No message should be sent successfully.")
- } catch {
- case e: Exception =>
- assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
- }
- }
- val fetchResponse = if (leader0.get == configs(0).brokerId) {
- consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
- } else {
- consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ // create topic
+ val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+ val leader0 = leaders(0)
+ val leader1 = leaders(1)
+
+ // create record
+ val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+
+ // Test closing from caller thread.
+ for(i <- 0 until 50) {
+ val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+ val responses = (0 until numRecords) map (i => producer.send(record0))
+ assertTrue("No request is complete.", responses.forall(!_.isDone()))
+ producer.close(0, TimeUnit.MILLISECONDS)
+ responses.foreach { future =>
+ try {
+ future.get()
+ fail("No message should be sent successfully.")
+ } catch {
+ case e: Exception =>
+ assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
}
- assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
}
- } finally {
- if (producer != null)
- producer.close()
+ val fetchResponse = if (leader0.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ }
+ assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
}
}
@@ -336,48 +323,42 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testCloseWithZeroTimeoutFromSenderThread() {
- var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
- try {
- // create topic
- val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
- val leader = leaders(0)
-
- // create record
- val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
-
- // Test closing from sender thread.
- class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
- override def onCompletion(metadata: RecordMetadata, exception: Exception) {
- // Trigger another batch in accumulator before close the producer. These messages should
- // not be sent.
- (0 until numRecords) map (i => producer.send(record))
- // The close call will be called by all the message callbacks. This tests idempotence of the close call.
- producer.close(0, TimeUnit.MILLISECONDS)
- // Test close with non zero timeout. Should not block at all.
- producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
- }
+ // create topic
+ val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+ val leader = leaders(0)
+
+ // create record
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+
+ // Test closing from sender thread.
+ class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
+ override def onCompletion(metadata: RecordMetadata, exception: Exception) {
+ // Trigger another batch in accumulator before close the producer. These messages should
+ // not be sent.
+ (0 until numRecords) map (i => producer.send(record))
+ // The close call will be called by all the message callbacks. This tests idempotence of the close call.
+ producer.close(0, TimeUnit.MILLISECONDS)
+ // Test close with non zero timeout. Should not block at all.
+ producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
}
- for(i <- 0 until 50) {
- producer = createProducer(brokerList, lingerMs = Long.MaxValue)
- // send message to partition 0
- val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
- assertTrue("No request is complete.", responses.forall(!_.isDone()))
- // flush the messages.
- producer.flush()
- assertTrue("All request are complete.", responses.forall(_.isDone()))
- // Check the messages received by broker.
- val fetchResponse = if (leader.get == configs(0).brokerId) {
- consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
- } else {
- consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
- }
- val expectedNumRecords = (i + 1) * numRecords
- assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
- expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+ }
+ for(i <- 0 until 50) {
+ val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+ // send message to partition 0
+ val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+ assertTrue("No request is complete.", responses.forall(!_.isDone()))
+ // flush the messages.
+ producer.flush()
+ assertTrue("All request are complete.", responses.forall(_.isDone()))
+ // Check the messages received by broker.
+ val fetchResponse = if (leader.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
}
- } finally {
- if (producer != null)
- producer.close()
+ val expectedNumRecords = (i + 1) * numRecords
+ assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+ expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5741ce2..bbc9a54 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -54,15 +54,17 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp()
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ producerConfig.putAll(TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile))
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
- for(i <- 0 until producerCount)
+ consumerConfig.putAll(TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile))
+ for (i <- 0 until producerCount)
producers += new KafkaProducer(producerConfig)
- for(i <- 0 until consumerCount) {
+ for (i <- 0 until consumerCount) {
consumers += new KafkaConsumer(consumerConfig)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 335d585..eb67599 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,4 +12,304 @@
*/
package kafka.api
-class PlaintextConsumerTest extends BaseConsumerTest
+import java.util.regex.Pattern
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConverters
+import JavaConverters._
+
+/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
+class PlaintextConsumerTest extends BaseConsumerTest {
+
+ @Test
+ def testAutoCommitOnClose() {
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ val numRecords = 10000
+ sendRecords(numRecords)
+
+ consumer0.subscribe(List(topic).asJava)
+
+ val assignment = Set(tp, tp2)
+ TestUtils.waitUntilTrue(() => {
+ consumer0.poll(50)
+ consumer0.assignment() == assignment.asJava
+ }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+ // should auto-commit seeked positions before closing
+ consumer0.seek(tp, 300)
+ consumer0.seek(tp2, 500)
+ consumer0.close()
+
+ // now we should see the committed positions from another consumer
+ assertEquals(300, this.consumers(0).committed(tp).offset)
+ assertEquals(500, this.consumers(0).committed(tp2).offset)
+ }
+
+ @Test
+ def testAutoCommitOnCloseAfterWakeup() {
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ val numRecords = 10000
+ sendRecords(numRecords)
+
+ consumer0.subscribe(List(topic).asJava)
+
+ val assignment = Set(tp, tp2)
+ TestUtils.waitUntilTrue(() => {
+ consumer0.poll(50)
+ consumer0.assignment() == assignment.asJava
+ }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+ // should auto-commit seeked positions before closing
+ consumer0.seek(tp, 300)
+ consumer0.seek(tp2, 500)
+
+ // wakeup the consumer before closing to simulate trying to break a poll
+ // loop from another thread
+ consumer0.wakeup()
+ consumer0.close()
+
+ // now we should see the committed positions from another consumer
+ assertEquals(300, this.consumers(0).committed(tp).offset)
+ assertEquals(500, this.consumers(0).committed(tp2).offset)
+ }
+
+ @Test
+ def testAutoOffsetReset() {
+ sendRecords(1)
+ this.consumers(0).assign(List(tp).asJava)
+ consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ }
+
+ @Test
+ def testGroupConsumption() {
+ sendRecords(10)
+ this.consumers(0).subscribe(List(topic).asJava)
+ consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ }
+
+ @Test
+ def testPatternSubscription() {
+ val numRecords = 10000
+ sendRecords(numRecords)
+
+ val topic1 = "tblablac" // matches subscribed pattern
+ TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+ sendRecords(1000, new TopicPartition(topic1, 0))
+ sendRecords(1000, new TopicPartition(topic1, 1))
+
+ val topic2 = "tblablak" // does not match subscribed pattern
+ TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+ sendRecords(1000, new TopicPartition(topic2, 0))
+ sendRecords(1000, new TopicPartition(topic2, 1))
+
+ val topic3 = "tblab1" // does not match subscribed pattern
+ TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
+ sendRecords(1000, new TopicPartition(topic3, 0))
+ sendRecords(1000, new TopicPartition(topic3, 1))
+
+ assertEquals(0, this.consumers(0).assignment().size)
+
+ val pattern = Pattern.compile("t.*c")
+ this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
+ this.consumers(0).poll(50)
+
+ var subscriptions = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+ val topic4 = "tsomec" // matches subscribed pattern
+ TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
+ sendRecords(1000, new TopicPartition(topic4, 0))
+ sendRecords(1000, new TopicPartition(topic4, 1))
+
+ subscriptions ++= Set(
+ new TopicPartition(topic4, 0),
+ new TopicPartition(topic4, 1))
+
+
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+ this.consumers(0).unsubscribe()
+ assertEquals(0, this.consumers(0).assignment().size)
+ }
+
+ @Test
+ def testPatternUnsubscription() {
+ val numRecords = 10000
+ sendRecords(numRecords)
+
+ val topic1 = "tblablac" // matches subscribed pattern
+ TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+ sendRecords(1000, new TopicPartition(topic1, 0))
+ sendRecords(1000, new TopicPartition(topic1, 1))
+
+ assertEquals(0, this.consumers(0).assignment().size)
+
+ this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+ this.consumers(0).poll(50)
+
+ val subscriptions = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+ this.consumers(0).unsubscribe()
+ assertEquals(0, this.consumers(0).assignment().size)
+ }
+
+ @Test
+ def testCommitMetadata() {
+ this.consumers(0).assign(List(tp).asJava)
+
+ // sync commit
+ val syncMetadata = new OffsetAndMetadata(5, "foo")
+ this.consumers(0).commitSync(Map((tp, syncMetadata)).asJava)
+ assertEquals(syncMetadata, this.consumers(0).committed(tp))
+
+ // async commit
+ val asyncMetadata = new OffsetAndMetadata(10, "bar")
+ val callback = new CountConsumerCommitCallback
+ this.consumers(0).commitAsync(Map((tp, asyncMetadata)).asJava, callback)
+ awaitCommitCallback(this.consumers(0), callback)
+
+ assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+ }
+
+ @Test
+ def testExpandingTopicSubscriptions() {
+ val otherTopic = "other"
+ val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+ this.consumers(0).subscribe(List(topic).asJava)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+
+ TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
+ this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment == expandedSubscriptions.asJava
+ }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ }
+
+ @Test
+ def testShrinkingTopicSubscriptions() {
+ val otherTopic = "other"
+ TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
+ val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+ val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+
+ this.consumers(0).subscribe(List(topic).asJava)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).assignment == shrunkenSubscriptions.asJava
+ }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ }
+
+ @Test
+ def testPartitionsFor() {
+ val numParts = 2
+ TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
+ val parts = this.consumers(0).partitionsFor("part-test")
+ assertNotNull(parts)
+ assertEquals(2, parts.size)
+ assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+ }
+
+ @Test
+ def testSeek() {
+ val consumer = this.consumers(0)
+ val totalRecords = 50L
+ sendRecords(totalRecords.toInt)
+ consumer.assign(List(tp).asJava)
+
+ consumer.seekToEnd(tp)
+ assertEquals(totalRecords, consumer.position(tp))
+ assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+ consumer.seekToBeginning(tp)
+ assertEquals(0, consumer.position(tp), 0)
+ consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
+
+ val mid = totalRecords / 2
+ consumer.seek(tp, mid)
+ assertEquals(mid, consumer.position(tp))
+ consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
+ }
+
+ def testPositionAndCommit() {
+ sendRecords(5)
+
+ // committed() on a partition with no committed offset throws an exception
+ intercept[NoOffsetForPartitionException] {
+ this.consumers(0).committed(new TopicPartition(topic, 15))
+ }
+
+ // position() on a partition that we aren't subscribed to throws an exception
+ intercept[IllegalArgumentException] {
+ this.consumers(0).position(new TopicPartition(topic, 15))
+ }
+
+ this.consumers(0).assign(List(tp).asJava)
+
+ assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+ this.consumers(0).commitSync()
+ assertEquals(0L, this.consumers(0).committed(tp).offset)
+
+ consumeAndVerifyRecords(this.consumers(0), 5, 0)
+ assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+ this.consumers(0).commitSync()
+ assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
+
+ sendRecords(1)
+
+ // another consumer in the same group should get the same position
+ this.consumers(1).assign(List(tp).asJava)
+ consumeAndVerifyRecords(this.consumers(1), 1, 5)
+ }
+
+ @Test
+ def testPartitionPauseAndResume() {
+ sendRecords(5)
+ this.consumers(0).assign(List(tp).asJava)
+ consumeAndVerifyRecords(this.consumers(0), 5, 0)
+ this.consumers(0).pause(tp)
+ sendRecords(5)
+ assertTrue(this.consumers(0).poll(0).isEmpty)
+ this.consumers(0).resume(tp)
+ consumeAndVerifyRecords(this.consumers(0), 5, 5)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 649c927..7990020 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -90,11 +90,10 @@ class QuotasTest extends KafkaServerTestHarness {
// Create consumers
val consumerProps = new Properties
- consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index 9575fda..14ceb43 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -12,52 +12,70 @@
*/
package kafka.api
-import java.io.{BufferedReader, FileWriter, BufferedWriter, File}
+import java.io.{FileWriter, BufferedWriter, File}
import javax.security.auth.login.Configuration
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.kerberos.LoginManager
import org.junit.{After, Before}
trait SaslTestHarness extends ZooKeeperTestHarness {
- val workDir = new File(System.getProperty("test.dir", "target"))
- val kdcConf = MiniKdc.createConf()
- val kdc = new MiniKdc(kdcConf, workDir)
+
+ private val workDir = new File(System.getProperty("test.dir", "target"))
+ private val kdcConf = MiniKdc.createConf()
+ private val kdc = new MiniKdc(kdcConf, workDir)
@Before
override def setUp() {
- // Clean-up global configuration set by other tests
+ // Important if tests leak consumers, producers or brokers
+ LoginManager.closeAll()
+ val keytabFile = createKeytabAndSetConfiguration("kafka_jaas.conf")
+ kdc.start()
+ kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+ super.setUp
+ }
+
+ protected def createKeytabAndSetConfiguration(jaasResourceName: String): File = {
+ val (keytabFile, jaasFile) = createKeytabAndJaasFiles(jaasResourceName)
+ // This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null)
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
+ keytabFile
+ }
+
+ private def createKeytabAndJaasFiles(jaasResourceName: String): (File, File) = {
val keytabFile = TestUtils.tempFile()
val jaasFile = TestUtils.tempFile()
val writer = new BufferedWriter(new FileWriter(jaasFile))
- val source = io.Source.fromInputStream(
- Thread.currentThread().getContextClassLoader.getResourceAsStream("kafka_jaas.conf"), "UTF-8")
- if (source == null)
- throw new IllegalStateException("Could not load `kaas_jaas.conf`, make sure it is in the classpath")
+ val inputStream = Thread.currentThread().getContextClassLoader.getResourceAsStream(jaasResourceName)
+ if (inputStream == null)
+ throw new IllegalStateException(s"Could not find `$jaasResourceName`, make sure it is in the classpath")
+ val source = io.Source.fromInputStream(inputStream, "UTF-8")
for (line <- source.getLines) {
val replaced = line.replaceAll("\\$keytab-location", keytabFile.getAbsolutePath)
writer.write(replaced)
writer.newLine()
}
+
writer.close()
source.close()
- kdc.start()
- kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
- System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
- super.setUp
+ (keytabFile, jaasFile)
}
@After
override def tearDown() {
super.tearDown
kdc.stop()
+ // Important if tests leak consumers, producers or brokers
+ LoginManager.closeAll()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 26b86f7..e9784e0 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -52,8 +52,6 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
- def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
-
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def trustStoreFile: Option[File] = None