You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/28 18:48:47 UTC

[2/2] kafka git commit: KAFKA-2675; SASL/Kerberos follow up

KAFKA-2675; SASL/Kerberos follow up

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Sriharsha Chintalapani <sc...@hortonworks.com>, Jun Rao <ju...@gmail.com>

Closes #376 from ijuma/KAFKA-2675-sasl-kerberos-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9855bb9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9855bb9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9855bb9c

Branch: refs/heads/trunk
Commit: 9855bb9c655ebad0ed5586175452056466189b8a
Parents: b6fe164
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Oct 28 10:48:39 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 28 10:48:39 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |   2 -
 .../kafka/clients/producer/ProducerConfig.java  |   2 -
 .../apache/kafka/common/config/SaslConfigs.java |  24 +-
 .../common/network/SaslChannelBuilder.java      |   2 +-
 .../authenticator/SaslClientAuthenticator.java  |  21 +-
 .../authenticator/SaslServerAuthenticator.java  |  10 +-
 .../common/security/kerberos/KerberosName.java  |   8 +-
 .../security/kerberos/KerberosNameParser.java   |  12 +-
 .../kafka/common/security/kerberos/Login.java   |  16 +-
 .../common/security/kerberos/LoginManager.java  |  12 +
 .../distributed/DistributedHerderConfig.java    |   3 +-
 .../controller/ControllerChannelManager.scala   |  21 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  10 +-
 .../kafka/api/BaseConsumerTest.scala            | 357 ++-----------------
 .../kafka/api/BaseProducerSendTest.scala        | 151 ++++----
 .../kafka/api/IntegrationTestHarness.scala      |  10 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 302 +++++++++++++++-
 .../integration/kafka/api/QuotasTest.scala      |   3 +-
 .../integration/kafka/api/SaslTestHarness.scala |  44 ++-
 .../integration/KafkaServerTestHarness.scala    |   2 -
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  31 +-
 22 files changed, 531 insertions(+), 514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 14c54c2..b366efd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -304,11 +304,9 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
                                 .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
-                                .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         40 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6d40b77..e9d9fad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -282,11 +282,9 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
                                 .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
-                                .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 0abefe7..657c6d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -21,34 +21,32 @@ public class SaslConfigs {
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
      */
 
-    public static final String SASL_KAFKA_SERVER_REALM = "sasl.kafka.server.realm";
-    public static final String SASL_KAFKA_SERVER_DOC = "The sasl kafka server realm. "
-        + "Default will be from kafka jaas config";
-
     public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
     public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
-            + "This can be defined either in the JAAS config or in the Kakfa config.";
+        + "This can be defined either in Kafka's JAAS config or in Kafka's config.";
 
     public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
     public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
-        + "Default will be /usr/bin/kinit";
+        + "Default is /usr/bin/kinit";
     public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
 
     public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
-    public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "LoginThread will sleep until specified window factor of time from last refresh"
-        + " to ticket's expiry has been reached, at which time it will wake and try to renew the ticket.";
+    public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "Login thread will sleep until the specified window factor of time from last refresh"
+        + " to ticket's expiry has been reached, at which time it will try to renew the ticket.";
     public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;
 
     public static final String SASL_KERBEROS_TICKET_RENEW_JITTER = "sasl.kerberos.ticket.renew.jitter";
-    public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time";
+    public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time.";
     public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;
 
     public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = "sasl.kerberos.min.time.before.relogin";
-    public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "LoginThread sleep time between refresh attempts";
+    public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "Login thread sleep time between refresh attempts.";
     public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
 
-    public static final String AUTH_TO_LOCAL = "kafka.security.auth.to.local";
-    public static final String AUTH_TO_LOCAL_DOC = "Rules for the mapping between principal names and operating system user names";
-    public static final List<String> DEFAULT_AUTH_TO_LOCAL = Collections.singletonList("DEFAULT");
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = "sasl.kerberos.principal.to.local.rules";
+    public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal names to short names (typically operating system usernames). " +
+            "The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. " +
+            "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
+    public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 148e549..4d52738 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -66,7 +66,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             } catch (Exception ke) {
                 defaultRealm = "";
             }
-            kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
+            kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES));
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 this.sslFactory = new SslFactory(mode);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 3929160..654be14 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -109,8 +109,8 @@ public class SaslClientAuthenticator implements Authenticator {
                             new ClientCallbackHandler());
                 }
             });
-        } catch (Exception e) {
-            throw new KafkaException("Failed to create SASL client", e);
+        } catch (PrivilegedActionException e) {
+            throw new KafkaException("Failed to create SaslClient", e.getCause());
         }
     }
 
@@ -160,9 +160,9 @@ public class SaslClientAuthenticator implements Authenticator {
                     netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken));
                     flushNetOutBufferAndUpdateInterestOps();
                 }
-            } catch (SaslException se) {
+            } catch (IOException e) {
                 saslState = SaslState.FAILED;
-                throw new IOException("Failed to authenticate using SASL " + se);
+                throw e;
             }
         }
     }
@@ -189,9 +189,8 @@ public class SaslClientAuthenticator implements Authenticator {
     }
 
     private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
-        if (saslToken == null) {
-            throw new SaslException("Error in authenticating with a Kafka Broker: the kafka broker saslToken is null.");
-        }
+        if (saslToken == null)
+            throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
 
         try {
             return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
@@ -200,8 +199,7 @@ public class SaslClientAuthenticator implements Authenticator {
                 }
             });
         } catch (PrivilegedActionException e) {
-            String error = "An error: (" + e + ") occurred when evaluating Kafka Brokers " +
-                      " received SASL token.";
+            String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
             // Try to provide hints to use about what went wrong so they can fix their configuration.
             // TODO: introspect about e: look for GSS information.
             final String unknownServerErrorText =
@@ -214,7 +212,8 @@ public class SaslClientAuthenticator implements Authenticator {
                     " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
             }
             error += " Kafka Client will go to AUTH_FAILED state.";
-            throw new SaslException(error);
+            //Unwrap the SaslException inside `PrivilegedActionException`
+            throw new SaslException(error, e.getCause());
         }
     }
 
@@ -234,7 +233,7 @@ public class SaslClientAuthenticator implements Authenticator {
                     nc.setName(nc.getDefaultName());
                 } else if (callback instanceof PasswordCallback) {
                     // Call `setPassword` once we support obtaining a password from the user and update message below
-                    LOG.warn("Could not login: the client is being asked for a password, but the Kafka" +
+                    throw new UnsupportedCallbackException(callback, "Could not login: the client is being asked for a password, but the Kafka" +
                              " client code does not currently support obtaining a password from the user." +
                              " Make sure -Djava.security.auth.login.config property passed to JVM and" +
                              " the client is configured to use a ticket cache (using" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index d06a22a..4104668 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -124,16 +124,12 @@ public class SaslServerAuthenticator implements Authenticator {
 
         try {
             return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
-                public SaslServer run() {
-                    try {
-                        return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
-                    } catch (SaslException e) {
-                        throw new KafkaException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e);
-                    }
+                public SaslServer run() throws SaslException {
+                    return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
                 }
             });
         } catch (PrivilegedActionException e) {
-            throw new KafkaException("Kafka Broker experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context", e);
+            throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
index aef10db..46f0edf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
@@ -31,18 +31,18 @@ public class KerberosName {
     private final String realm;
 
     /* Rules for the translation of the principal name into an operating system name */
-    private final List<KerberosRule> authToLocalRules;
+    private final List<KerberosRule> principalToLocalRules;
 
     /**
      * Creates an instance of `KerberosName` with the provided parameters.
      */
-    public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> authToLocalRules) {
+    public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> principalToLocalRules) {
         if (serviceName == null)
             throw new IllegalArgumentException("serviceName must not be null");
         this.serviceName = serviceName;
         this.hostName = hostName;
         this.realm = realm;
-        this.authToLocalRules = authToLocalRules;
+        this.principalToLocalRules = principalToLocalRules;
     }
 
     /**
@@ -103,7 +103,7 @@ public class KerberosName {
         } else {
             params = new String[]{realm, serviceName, hostName};
         }
-        for (KerberosRule r : authToLocalRules) {
+        for (KerberosRule r : principalToLocalRules) {
             String result = r.apply(params);
             if (result != null)
                 return result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
index 95eb170..eb4e6f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.security.kerberos;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -44,10 +45,11 @@ public class KerberosNameParser {
     /**
      * The list of translation rules.
      */
-    private final List<KerberosRule> authToLocalRules;
+    private final List<KerberosRule> principalToLocalRules;
 
-    public KerberosNameParser(String defaultRealm, List<String> authToLocalRules) {
-        this.authToLocalRules = parseRules(defaultRealm, authToLocalRules);
+    public KerberosNameParser(String defaultRealm, List<String> principalToLocalRules) {
+        List<String> rules = principalToLocalRules == null ? Collections.singletonList("DEFAULT") : principalToLocalRules;
+        this.principalToLocalRules = parseRules(defaultRealm, rules);
     }
 
     /**
@@ -59,10 +61,10 @@ public class KerberosNameParser {
             if (principalName.contains("@")) {
                 throw new IllegalArgumentException("Malformed Kerberos name: " + principalName);
             } else {
-                return new KerberosName(principalName, null, null, authToLocalRules);
+                return new KerberosName(principalName, null, null, principalToLocalRules);
             }
         } else {
-            return new KerberosName(match.group(1), match.group(3), match.group(4), authToLocalRules);
+            return new KerberosName(match.group(1), match.group(3), match.group(4), principalToLocalRules);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
index dd885e5..470ab96 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
@@ -258,7 +258,7 @@ public class Login {
                     }
                 }
             }
-        }, false);
+        }, true);
     }
 
     public void startThreadIfNeeded() {
@@ -288,20 +288,10 @@ public class Login {
         if (jaasConfigFile == null) {
             throw new IllegalArgumentException("You must pass " + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + " in secure mode.");
         }
-
         AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
         if (configEntries == null) {
-            // Forcing a reload of the configuration in case it's been overridden by third-party code.
-            // Without this, our tests fail sometimes depending on the order the tests are executed.
-            // Singletons are bad.
-            Configuration.setConfiguration(null);
-            configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
-            if (configEntries == null) {
-                String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`.";
-                throw new IllegalArgumentException(errorMessage);
-            } else {
-                log.info("Found `" + loginContextName + "` in JAAS configuration after forced reload.");
-            }
+            String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`.";
+            throw new IllegalArgumentException(errorMessage);
         }
 
         LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
index 18651c8..ac31f9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
@@ -21,6 +21,7 @@ package org.apache.kafka.common.security.kerberos;
 import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.Map;
 
@@ -115,4 +116,15 @@ public class LoginManager {
             --refCount;
         }
     }
+
+    /* Should only be used in tests. */
+    public static void closeAll() {
+        synchronized (LoginManager.class) {
+            for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) {
+                LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
+                loginManager.login.shutdown();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
index 1e09e8b..86c4d1e 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -156,11 +156,10 @@ public class DistributedHerderConfig extends AbstractConfig {
                 .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
-                .define(SaslConfigs.AUTH_TO_LOCAL, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, ConfigDef.Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
                 .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
                         ConfigDef.Type.INT,
                         40 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6b24c29..e52a9d3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -112,12 +112,13 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     }
     val threadName = threadNamePrefix match {
       case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
-      case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name,config.brokerId, broker.id)
+      case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
     }
 
-    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time, threadName)
+    val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
+      brokerNode, config, time, threadName)
     requestThread.setDaemon(false)
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread))
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
   }
 
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
@@ -125,7 +126,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
       brokerState.requestSendThread.shutdown()
-      brokerStateInfo.remove(brokerState.broker.id)
+      brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)
     }
@@ -142,7 +143,6 @@ case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: Abstra
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
-                        val toBroker: Broker,
                         val queue: BlockingQueue[QueueItem],
                         val networkClient: NetworkClient,
                         val brokerNode: Node,
@@ -186,7 +186,7 @@ class RequestSendThread(val controllerId: Int,
             case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                  request.toString, toBroker.toString()), e)
+                  request.toString, brokerNode.toString()), e)
               networkClient.close(brokerNode.idString)
               isSendSuccessful = false
               backoff()
@@ -200,7 +200,7 @@ class RequestSendThread(val controllerId: Int,
             case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
           }
           stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
-            .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
+            .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
 
           if (callback != null) {
             callback(response)
@@ -209,7 +209,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
+        error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
         // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
         networkClient.close(brokerNode.idString)
     }
@@ -227,12 +227,12 @@ class RequestSendThread(val controllerId: Int,
         if (!ready)
           throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
-        info("Controller %d connected to %s for sending state change requests".format(controllerId, toBroker.toString()))
+        info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
         true
       }
     } catch {
       case e: Throwable =>
-        warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, toBroker.toString()), e)
+        warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e)
         networkClient.close(brokerNode.idString)
         false
     }
@@ -439,7 +439,6 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 
 case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      brokerNode: Node,
-                                     broker: Broker,
                                      messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7aba1c9..b749446 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -173,7 +173,7 @@ object Defaults {
   val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
   val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
-  val AuthToLocal = SaslConfigs.DEFAULT_AUTH_TO_LOCAL
+  val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
 
 }
 
@@ -328,7 +328,7 @@ object KafkaConfig {
   val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
   val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
-  val AuthToLocalProp = SaslConfigs.AUTH_TO_LOCAL
+  val SaslKerberosPrincipalToLocalRulesProp = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
 
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
@@ -503,7 +503,7 @@ object KafkaConfig {
   val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC
   val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
-  val AuthToLocalDoc = SaslConfigs.AUTH_TO_LOCAL_DOC
+  val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
   private val configDef = {
     import ConfigDef.Importance._
@@ -667,7 +667,7 @@ object KafkaConfig {
       .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc)
       .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
       .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
-      .define(AuthToLocalProp, LIST, Defaults.AuthToLocal, MEDIUM, AuthToLocalDoc)
+      .define(SaslKerberosPrincipalToLocalRulesProp, LIST, Defaults.SaslKerberosPrincipalToLocalRules, MEDIUM, SaslKerberosPrincipalToLocalRulesDoc)
 
   }
 
@@ -836,7 +836,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
   val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
   val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
-  val authToLocal = getList(KafkaConfig.AuthToLocalProp)
+  val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
 
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index f99f0d8..db4295c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -12,13 +12,12 @@
  */
 package kafka.api
 
-import java.util.regex.Pattern
 import java.util
 
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
 import kafka.utils.{TestUtils, Logging}
 import kafka.server.KafkaConfig
@@ -27,7 +26,6 @@ import java.util.ArrayList
 import org.junit.Assert._
 import org.junit.{Test, Before}
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import kafka.coordinator.GroupCoordinator
 
@@ -71,7 +69,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(numRecords)
 
     assertEquals(0, this.consumers(0).assignment.size)
-    this.consumers(0).assign(List(tp))
+    this.consumers(0).assign(List(tp).asJava)
     assertEquals(1, this.consumers(0).assignment.size)
     
     this.consumers(0).seek(tp, 0)
@@ -88,62 +86,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testAutoCommitOnClose() {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    consumer0.subscribe(List(topic))
-
-    val assignment = Set(tp, tp2)
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
-
-    // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-    consumer0.close()
-
-    // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
-  }
-
-  @Test
-  def testAutoCommitOnCloseAfterWakeup() {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    consumer0.subscribe(List(topic))
-
-    val assignment = Set(tp, tp2)
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
-
-    // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-
-    // wakeup the consumer before closing to simulate trying to break a poll
-    // loop from another thread
-    consumer0.wakeup()
-    consumer0.close()
-
-    // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
-  }
-
-  @Test
   def testAutoCommitOnRebalance() {
     val topic2 = "topic2"
     TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
@@ -157,13 +99,13 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     val rebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         // keep partitions paused in this test so that we can verify the commits based on specific seeks
-        partitions.foreach(consumer0.pause(_))
+        partitions.asScala.foreach(consumer0.pause(_))
       }
 
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
     }
 
-    consumer0.subscribe(List(topic), rebalanceListener)
+    consumer0.subscribe(List(topic).asJava, rebalanceListener)
 
     val assignment = Set(tp, tp2)
     TestUtils.waitUntilTrue(() => {
@@ -175,7 +117,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     consumer0.seek(tp2, 500)
 
     // change subscription to trigger rebalance
-    consumer0.subscribe(List(topic, topic2), rebalanceListener)
+    consumer0.subscribe(List(topic, topic2).asJava, rebalanceListener)
 
     val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
     TestUtils.waitUntilTrue(() => {
@@ -189,97 +131,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testPatternSubscription() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
-
-    val topic2: String = "tblablak" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic2, 0))
-    sendRecords(1000, new TopicPartition(topic2, 1))
-
-    val topic3: String = "tblab1" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic3, 0))
-    sendRecords(1000, new TopicPartition(topic3, 1))
-
-    assertEquals(0, this.consumers(0).assignment().size)
-
-    val pattern: Pattern = Pattern.compile("t.*c")
-    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
-
-    var subscriptions = Set(
-      new TopicPartition(topic, 0),
-      new TopicPartition(topic, 1),
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic1, 1))
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
-    val topic4: String = "tsomec" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic4, 0))
-    sendRecords(1000, new TopicPartition(topic4, 1))
-
-    subscriptions = subscriptions ++ Set(
-      new TopicPartition(topic4, 0),
-      new TopicPartition(topic4, 1))
-
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
-  }
-
-  @Test
-  def testPatternUnsubscription() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
-
-    assertEquals(0, this.consumers(0).assignment().size)
-
-    this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
-
-    val subscriptions = Set(
-      new TopicPartition(topic, 0),
-      new TopicPartition(topic, 1),
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic1, 1))
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
-
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
-  }
-
-  @Test
   def testCommitSpecifiedOffsets() {
     sendRecords(5, tp)
     sendRecords(7, tp2)
 
-    this.consumers(0).assign(List(tp, tp2));
+    this.consumers(0).assign(List(tp, tp2).asJava)
 
     // Need to poll to join the group
     this.consumers(0).poll(50)
@@ -304,106 +160,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testAutoOffsetReset() {
-    sendRecords(1)
-    this.consumers(0).assign(List(tp))
-    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-  @Test
-  def testSeek() {
-    val consumer = this.consumers(0)
-    val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
-    consumer.assign(List(tp))
-
-    consumer.seekToEnd(tp)
-    assertEquals(totalRecords, consumer.position(tp))
-    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
-
-    consumer.seekToBeginning(tp)
-    assertEquals(0, consumer.position(tp), 0)
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
-
-    val mid = totalRecords / 2
-    consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
-  }
-
-  @Test
-  def testGroupConsumption() {
-    sendRecords(10)
-    this.consumers(0).subscribe(List(topic))
-    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-
-  @Test
-  def testCommitMetadata() {
-    this.consumers(0).assign(List(tp))
-
-    // sync commit
-    val syncMetadata = new OffsetAndMetadata(5, "foo")
-    this.consumers(0).commitSync(Map((tp, syncMetadata)))
-    assertEquals(syncMetadata, this.consumers(0).committed(tp))
-
-    // async commit
-    val asyncMetadata = new OffsetAndMetadata(10, "bar")
-    val callback = new CountConsumerCommitCallback
-    this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback)
-    awaitCommitCallback(this.consumers(0), callback)
-
-    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
-  }
-
-  def testPositionAndCommit() {
-    sendRecords(5)
-
-    // committed() on a partition with no committed offset throws an exception
-    intercept[NoOffsetForPartitionException] {
-      this.consumers(0).committed(new TopicPartition(topic, 15))
-    }
-
-    // position() on a partition that we aren't subscribed to throws an exception
-    intercept[IllegalArgumentException] {
-      this.consumers(0).position(new TopicPartition(topic, 15))
-    }
-
-    this.consumers(0).assign(List(tp))
-
-    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp).offset)
-
-    consumeAndVerifyRecords(this.consumers(0), 5, 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
-
-    sendRecords(1)
-
-    // another consumer in the same group should get the same position
-    this.consumers(1).assign(List(tp))
-    consumeAndVerifyRecords(this.consumers(1), 1, 5)
-  }
-
-  @Test
-  def testPartitionsFor() {
-    val numParts = 2
-    TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
-    val parts = this.consumers(0).partitionsFor("part-test")
-    assertNotNull(parts)
-    assertEquals(2, parts.size)
-    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
-  }
-
-  @Test
   def testListTopics() {
     val numParts = 2
-    val topic1: String = "part-test-topic-1"
-    val topic2: String = "part-test-topic-2"
-    val topic3: String = "part-test-topic-3"
+    val topic1 = "part-test-topic-1"
+    val topic2 = "part-test-topic-2"
+    val topic3 = "part-test-topic-3"
     TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
     TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers)
     TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
@@ -420,28 +181,28 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testPartitionReassignmentCallback() {
     val listener = new TestConsumerReassignmentListener()
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
-    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumer0.subscribe(List(topic), listener)
-        
+    consumer0.subscribe(List(topic).asJava, listener)
+
     // the initial subscription should cause a callback execution
-    while(listener.callsToAssigned == 0)
+    while (listener.callsToAssigned == 0)
       consumer0.poll(50)
-    
+
     // get metadata for the topic
-    var parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
-    while(parts == null)
+    var parts: Seq[PartitionInfo] = null
+    while (parts == null)
       parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts(0).leader())
-    
+
     // shutdown the coordinator
     val coordinator = parts(0).leader().id()
     this.servers(coordinator).shutdown()
-    
+
     // this should cause another callback execution
-    while(listener.callsToAssigned < 2)
+    while (listener.callsToAssigned < 2)
       consumer0.poll(50)
 
     assertEquals(2, listener.callsToAssigned)
@@ -455,19 +216,19 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testUnsubscribeTopic() {
 
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
-    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     try {
       val listener = new TestConsumerReassignmentListener()
-      consumer0.subscribe(List(topic), listener)
+      consumer0.subscribe(List(topic).asJava, listener)
 
       // the initial subscription should cause a callback execution
       while (listener.callsToAssigned == 0)
         consumer0.poll(50)
 
-      consumer0.subscribe(List())
+      consumer0.subscribe(List[String]().asJava)
       assertEquals(0, consumer0.assignment.size())
     } finally {
       consumer0.close()
@@ -475,75 +236,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testExpandingTopicSubscriptions() {
-    val otherTopic = "other"
-    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
-    val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
-    this.consumers(0).subscribe(List(topic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-
-    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
-    this.consumers(0).subscribe(List(topic, otherTopic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == expandedSubscriptions.asJava
-    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-  }
-
-  @Test
-  def testShrinkingTopicSubscriptions() {
-    val otherTopic = "other"
-    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
-    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
-    val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
-    this.consumers(0).subscribe(List(topic, otherTopic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-
-    this.consumers(0).subscribe(List(topic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == shrunkenSubscriptions.asJava
-    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
-  }
-
-  @Test
-  def testPartitionPauseAndResume() {
-    sendRecords(5)
-    this.consumers(0).assign(List(tp))
-    consumeAndVerifyRecords(this.consumers(0), 5, 0)
-    this.consumers(0).pause(tp)
-    sendRecords(5)
-    assertTrue(this.consumers(0).poll(0).isEmpty)
-    this.consumers(0).resume(tp)
-    consumeAndVerifyRecords(this.consumers(0), 5, 5)
-  }
-
-  @Test
   def testPauseStateNotPreservedByRebalance() {
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
-    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     sendRecords(5)
-    consumer0.subscribe(List(topic))
+    consumer0.subscribe(List(topic).asJava)
     consumeAndVerifyRecords(consumer0, 5, 0)
     consumer0.pause(tp)
 
     // subscribe to a new topic to trigger a rebalance
-    consumer0.subscribe(List("topic2"))
+    consumer0.subscribe(List("topic2").asJava)
 
     // after rebalance, our position should be reset and our pause state lost,
     // so we should be able to consume from the beginning
     consumeAndVerifyRecords(consumer0, 0, 5)
   }
 
-  private class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
+  protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
     def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
@@ -556,17 +267,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
-  private def sendRecords(numRecords: Int): Unit = {
+  protected def sendRecords(numRecords: Int): Unit = {
     sendRecords(numRecords, tp)
   }
 
-  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+  protected def sendRecords(numRecords: Int, tp: TopicPartition) {
     (0 until numRecords).map { i =>
       this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
     }.foreach(_.get)
   }
 
-  private def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
+  protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
                                       startingKeyAndValueIndex: Int = 0) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
     val maxIters = numRecords * 300
@@ -590,7 +301,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
-  private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+  protected def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
     val startCount = commitCallback.count
     val started = System.currentTimeMillis()
     while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
@@ -598,7 +309,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     assertEquals(startCount + 1, commitCallback.count)
   }
 
-  private class CountConsumerCommitCallback extends OffsetCommitCallback {
+  protected class CountConsumerCommitCallback extends OffsetCommitCallback {
     var count = 0
 
     override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 92c93e6..29291d4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -131,8 +131,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
 
     } finally {
-      if (producer != null)
-        producer.close()
+      producer.close()
     }
   }
 
@@ -184,8 +183,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
 
     } finally {
-      if (producer != null)
-        producer.close()
+      producer.close()
     }
   }
 
@@ -237,8 +235,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         assertEquals(i.toLong, messageSet1(i).offset)
       }
     } finally {
-      if (producer != null)
-        producer.close()
+      producer.close()
     }
   }
 
@@ -260,9 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
 
     } finally {
-      if (producer != null) {
-        producer.close()
-      }
+      producer.close()
     }
   }
 
@@ -282,8 +277,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         assertTrue("All requests are complete.", responses.forall(_.isDone()))
       }
     } finally {
-      if (producer != null)
-        producer.close()
+      producer.close()
     }
   }
 
@@ -292,42 +286,35 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromCallerThread() {
-    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val leader0 = leaders(0)
-      val leader1 = leaders(1)
-
-      // create record
-      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
-      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
-
-      // Test closing from caller thread.
-      for(i <- 0 until 50) {
-        producer = createProducer(brokerList, lingerMs = Long.MaxValue)
-        val responses = (0 until numRecords) map (i => producer.send(record0))
-        assertTrue("No request is complete.", responses.forall(!_.isDone()))
-        producer.close(0, TimeUnit.MILLISECONDS)
-        responses.foreach { future =>
-          try {
-            future.get()
-            fail("No message should be sent successfully.")
-          } catch {
-            case e: Exception =>
-              assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
-          }
-        }
-        val fetchResponse = if (leader0.get == configs(0).brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+    // create topic
+    val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+    val leader0 = leaders(0)
+    val leader1 = leaders(1)
+
+    // create record
+    val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+
+    // Test closing from caller thread.
+    for(i <- 0 until 50) {
+      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      val responses = (0 until numRecords) map (i => producer.send(record0))
+      assertTrue("No request is complete.", responses.forall(!_.isDone()))
+      producer.close(0, TimeUnit.MILLISECONDS)
+      responses.foreach { future =>
+        try {
+          future.get()
+          fail("No message should be sent successfully.")
+        } catch {
+          case e: Exception =>
+            assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
         }
-        assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
       }
-    } finally {
-      if (producer != null)
-        producer.close()
+      val fetchResponse = if (leader0.get == configs(0).brokerId) {
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+      }
+      assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
     }
   }
 
@@ -336,48 +323,42 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromSenderThread() {
-    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-      val leader = leaders(0)
-
-      // create record
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
-
-      // Test closing from sender thread.
-      class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
-        override def onCompletion(metadata: RecordMetadata, exception: Exception) {
-          // Trigger another batch in accumulator before close the producer. These messages should
-          // not be sent.
-          (0 until numRecords) map (i => producer.send(record))
-          // The close call will be called by all the message callbacks. This tests idempotence of the close call.
-          producer.close(0, TimeUnit.MILLISECONDS)
-          // Test close with non zero timeout. Should not block at all.
-          producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
-        }
+    // create topic
+    val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+    val leader = leaders(0)
+
+    // create record
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+
+    // Test closing from sender thread.
+    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
+      override def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        // Trigger another batch in accumulator before close the producer. These messages should
+        // not be sent.
+        (0 until numRecords) map (i => producer.send(record))
+        // The close call will be called by all the message callbacks. This tests idempotence of the close call.
+        producer.close(0, TimeUnit.MILLISECONDS)
+        // Test close with non zero timeout. Should not block at all.
+        producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
       }
-      for(i <- 0 until 50) {
-        producer = createProducer(brokerList, lingerMs = Long.MaxValue)
-        // send message to partition 0
-        val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
-        assertTrue("No request is complete.", responses.forall(!_.isDone()))
-        // flush the messages.
-        producer.flush()
-        assertTrue("All request are complete.", responses.forall(_.isDone()))
-        // Check the messages received by broker.
-        val fetchResponse = if (leader.get == configs(0).brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-        }
-        val expectedNumRecords = (i + 1) * numRecords
-        assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
-          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+    }
+    for(i <- 0 until 50) {
+      val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+      // send message to partition 0
+      val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+      assertTrue("No request is complete.", responses.forall(!_.isDone()))
+      // flush the messages.
+      producer.flush()
+      assertTrue("All request are complete.", responses.forall(_.isDone()))
+      // Check the messages received by broker.
+      val fetchResponse = if (leader.get == configs(0).brokerId) {
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
       }
-    } finally {
-      if (producer != null)
-        producer.close()
+      val expectedNumRecords = (i + 1) * numRecords
+      assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+        expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5741ce2..bbc9a54 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -54,15 +54,17 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    producerConfig.putAll(TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile))
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    for(i <- 0 until producerCount)
+    consumerConfig.putAll(TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile))
+    for (i <- 0 until producerCount)
       producers += new KafkaProducer(producerConfig)
-    for(i <- 0 until consumerCount) {
+    for (i <- 0 until consumerCount) {
       consumers += new KafkaConsumer(consumerConfig)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 335d585..eb67599 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,4 +12,304 @@
   */
 package kafka.api
 
-class PlaintextConsumerTest extends BaseConsumerTest
+import java.util.regex.Pattern
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConverters
+import JavaConverters._
+
+/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
+class PlaintextConsumerTest extends BaseConsumerTest {
+
+  @Test
+  def testAutoCommitOnClose() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic).asJava)
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoCommitOnCloseAfterWakeup() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic).asJava)
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+
+    // wakeup the consumer before closing to simulate trying to break a poll
+    // loop from another thread
+    consumer0.wakeup()
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).assign(List(tp).asJava)
+    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testGroupConsumption() {
+    sendRecords(10)
+    this.consumers(0).subscribe(List(topic).asJava)
+    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testPatternSubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1 = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic2, 0))
+    sendRecords(1000, new TopicPartition(topic2, 1))
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic3, 0))
+    sendRecords(1000, new TopicPartition(topic3, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    val pattern = Pattern.compile("t.*c")
+    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    var subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    val topic4 = "tsomec" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic4, 0))
+    sendRecords(1000, new TopicPartition(topic4, 1))
+
+    subscriptions ++= Set(
+      new TopicPartition(topic4, 0),
+      new TopicPartition(topic4, 1))
+
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
+  def testPatternUnsubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1 = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    val subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
+  def testCommitMetadata() {
+    this.consumers(0).assign(List(tp).asJava)
+
+    // sync commit
+    val syncMetadata = new OffsetAndMetadata(5, "foo")
+    this.consumers(0).commitSync(Map((tp, syncMetadata)).asJava)
+    assertEquals(syncMetadata, this.consumers(0).committed(tp))
+
+    // async commit
+    val asyncMetadata = new OffsetAndMetadata(10, "bar")
+    val callback = new CountConsumerCommitCallback
+    this.consumers(0).commitAsync(Map((tp, asyncMetadata)).asJava, callback)
+    awaitCommitCallback(this.consumers(0), callback)
+
+    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+  }
+
+  @Test
+  def testExpandingTopicSubscriptions() {
+    val otherTopic = "other"
+    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+    val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    this.consumers(0).subscribe(List(topic).asJava)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
+    this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+  }
+
+  @Test
+  def testShrinkingTopicSubscriptions() {
+    val otherTopic = "other"
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
+    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+    this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+
+    this.consumers(0).subscribe(List(topic).asJava)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+  }
+
+  @Test
+  def testPartitionsFor() {
+    val numParts = 2
+    TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
+    val parts = this.consumers(0).partitionsFor("part-test")
+    assertNotNull(parts)
+    assertEquals(2, parts.size)
+    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  @Test
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.assign(List(tp).asJava)
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
+  }
+
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).assign(List(tp).asJava)
+
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commitSync()
+    assertEquals(0L, this.consumers(0).committed(tp).offset)
+
+    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+    this.consumers(0).commitSync()
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).assign(List(tp).asJava)
+    consumeAndVerifyRecords(this.consumers(1), 1, 5)
+  }
+
+  @Test
+  def testPartitionPauseAndResume() {
+    sendRecords(5)
+    this.consumers(0).assign(List(tp).asJava)
+    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    this.consumers(0).pause(tp)
+    sendRecords(5)
+    assertTrue(this.consumers(0).poll(0).isEmpty)
+    this.consumers(0).resume(tp)
+    consumeAndVerifyRecords(this.consumers(0), 5, 5)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 649c927..7990020 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -90,11 +90,10 @@ class QuotasTest extends KafkaServerTestHarness {
 
     // Create consumers
     val consumerProps = new Properties
-    consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
     consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
     consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                       classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index 9575fda..14ceb43 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -12,52 +12,70 @@
   */
 package kafka.api
 
-import java.io.{BufferedReader, FileWriter, BufferedWriter, File}
+import java.io.{FileWriter, BufferedWriter, File}
 import javax.security.auth.login.Configuration
 
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.kerberos.LoginManager
 import org.junit.{After, Before}
 
 trait SaslTestHarness extends ZooKeeperTestHarness {
-  val workDir = new File(System.getProperty("test.dir", "target"))
-  val kdcConf = MiniKdc.createConf()
-  val kdc = new MiniKdc(kdcConf, workDir)
+
+  private val workDir = new File(System.getProperty("test.dir", "target"))
+  private val kdcConf = MiniKdc.createConf()
+  private val kdc = new MiniKdc(kdcConf, workDir)
 
   @Before
   override def setUp() {
-    // Clean-up global configuration set by other tests
+    // Important if tests leak consumers, producers or brokers
+    LoginManager.closeAll()
+    val keytabFile = createKeytabAndSetConfiguration("kafka_jaas.conf")
+    kdc.start()
+    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    super.setUp
+  }
+
+  protected def createKeytabAndSetConfiguration(jaasResourceName: String): File = {
+    val (keytabFile, jaasFile) = createKeytabAndJaasFiles(jaasResourceName)
+    // This will cause a reload of the Configuration singleton when `getConfiguration` is called
     Configuration.setConfiguration(null)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
+    keytabFile
+  }
+
+  private def createKeytabAndJaasFiles(jaasResourceName: String): (File, File) = {
     val keytabFile = TestUtils.tempFile()
     val jaasFile = TestUtils.tempFile()
 
     val writer = new BufferedWriter(new FileWriter(jaasFile))
-    val source = io.Source.fromInputStream(
-      Thread.currentThread().getContextClassLoader.getResourceAsStream("kafka_jaas.conf"), "UTF-8")
-    if (source == null)
-      throw new IllegalStateException("Could not load `kaas_jaas.conf`, make sure it is in the classpath")
+    val inputStream = Thread.currentThread().getContextClassLoader.getResourceAsStream(jaasResourceName)
+    if (inputStream == null)
+      throw new IllegalStateException(s"Could not find `$jaasResourceName`, make sure it is in the classpath")
+    val source = io.Source.fromInputStream(inputStream, "UTF-8")
 
     for (line <- source.getLines) {
       val replaced = line.replaceAll("\\$keytab-location", keytabFile.getAbsolutePath)
       writer.write(replaced)
       writer.newLine()
     }
+
     writer.close()
     source.close()
 
-    kdc.start()
-    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
-    super.setUp
+    (keytabFile, jaasFile)
   }
 
   @After
   override def tearDown() {
     super.tearDown
     kdc.stop()
+    // Important if tests leak consumers, producers or brokers
+    LoginManager.closeAll()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     Configuration.setConfiguration(null)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 26b86f7..e9784e0 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -52,8 +52,6 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
 
-  def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
-
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
   protected def trustStoreFile: Option[File] = None