You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 20:59:23 UTC

[2/3] kafka git commit: KAFKA-5746; Add new metrics to support health checks (KIP-188)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 8799ad7..9ecb21f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -88,6 +89,11 @@ public class InitProducerIdResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public short epoch() {
         return epoch;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a4431b9..56491eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -147,6 +147,11 @@ public class JoinGroupResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public int generationId() {
         return generationId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 39b8c37..921c8ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -87,6 +87,11 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
         return new LeaderAndIsrResponse(ApiKeys.LEADER_AND_ISR.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index bef21e9..f8682ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -73,6 +74,11 @@ public class LeaveGroupResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
         struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 8f48f39..afc5ebd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -97,6 +98,11 @@ public class ListGroupsResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static class Group {
         private final String groupId;
         private final String protocolType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 732fb49..13f2dfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -191,6 +191,14 @@ public class ListOffsetResponse extends AbstractResponse {
         return responseData;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionData response : responseData.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
         return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index fb69cef..99c4ffb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -294,6 +294,14 @@ public class MetadataResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (TopicMetadata metadata : topicMetadata)
+            updateErrorCounts(errorCounts, metadata.error);
+        return errorCounts;
+    }
+
     /**
      * Returns the set of topics with the specified error
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 13484ed..b439034 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -145,6 +145,11 @@ public class OffsetCommitResponse extends AbstractResponse {
         return responseData;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(responseData);
+    }
+
     public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
         return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index c3341e0..4d069fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -189,6 +189,11 @@ public class OffsetFetchResponse extends AbstractResponse {
         return this.error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 13d70b7..4a91533 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -82,6 +82,14 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
         return epochEndOffsetsByPartition;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (EpochEndOffset response : epochEndOffsetsByPartition.values())
+            updateErrorCounts(errorCounts, response.error());
+        return errorCounts;
+    }
+
     public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
         return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fbc7f76..ee4a2e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -334,6 +335,12 @@ public class ProduceRequest extends AbstractRequest {
         }
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts(Throwable e) {
+        Errors error = Errors.forException(e);
+        return Collections.singletonMap(error, partitions().size());
+    }
+
     private Collection<TopicPartition> partitions() {
         return partitionSizes.keySet();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index e1978dd..afedc9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -233,6 +233,14 @@ public class ProduceResponse extends AbstractResponse {
         return this.throttleTime;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (PartitionResponse response : responses.values())
+            updateErrorCounts(errorCounts, response.error);
+        return errorCounts;
+    }
+
     public static final class PartitionResponse {
         public Errors error;
         public long baseOffset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index c950cb9..e244182 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -85,6 +86,11 @@ public class SaslAuthenticateResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
         struct.set(ERROR_CODE, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c9f6369..252c62d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
@@ -76,6 +77,11 @@ public class SaslHandshakeResponse extends AbstractResponse {
     }
 
     @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
+    @Override
     public Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
         struct.set(ERROR_CODE, error.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 4196b83..8ad6222 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -84,6 +84,11 @@ public class StopReplicaResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static StopReplicaResponse parse(ByteBuffer buffer, short version) {
         return new StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index d68b2cd..77f9512 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -82,6 +83,11 @@ public class SyncGroupResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public ByteBuffer memberAssignment() {
         return memberState;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 53804d9..ff0f8ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -125,6 +125,11 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return errors;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(errors);
+    }
+
     public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
         return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 9ff8e27..4c21cde 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
@@ -56,6 +57,11 @@ public class UpdateMetadataResponse extends AbstractResponse {
         return error;
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(error);
+    }
+
     public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
         return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 797fb59..f4bf157 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -150,6 +150,16 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         return errors.get(producerId);
     }
 
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        for (Map<TopicPartition, Errors> allErrors : errors.values()) {
+            for (Errors error : allErrors.values())
+                updateErrorCounts(errorCounts, error);
+        }
+        return errorCounts;
+    }
+
     public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
         return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index fe57d27..739e0cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslServerAuthenticator implements Authenticator {
     // Next SASL state to be set when outgoing writes associated with the current SASL state complete
     private SaslState pendingSaslState = null;
     // Exception that will be thrown by `authenticate()` when SaslState is set to FAILED after outbound writes complete
-    private IOException pendingException = null;
+    private AuthenticationException pendingException = null;
     private SaslServer saslServer;
     private String saslMechanism;
     private AuthCallbackHandler callbackHandler;
@@ -272,8 +272,15 @@ public class SaslServerAuthenticator implements Authenticator {
                     default:
                         break;
                 }
+            } catch (SaslException | AuthenticationException e) {
+                // Exception will be propagated after response is sent to client
+                AuthenticationException authException = (e instanceof AuthenticationException) ?
+                        (AuthenticationException) e : new AuthenticationException("SASL authentication failed", e);
+                setSaslState(SaslState.FAILED, authException);
             } catch (Exception e) {
-                setSaslState(SaslState.FAILED, new IOException(e));
+                // In the case of IOExceptions and other unexpected exceptions, fail immediately
+                saslState = SaslState.FAILED;
+                throw e;
             }
         }
     }
@@ -303,7 +310,7 @@ public class SaslServerAuthenticator implements Authenticator {
         setSaslState(saslState, null);
     }
 
-    private void setSaslState(SaslState saslState, IOException exception) throws IOException {
+    private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
         if (netOutBuffer != null && !netOutBuffer.completed()) {
             pendingSaslState = saslState;
             pendingException = exception;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 25e09e1..6053da6 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -24,6 +24,10 @@ import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,27 +55,49 @@ public class AppInfoParser {
         return COMMIT_ID;
     }
 
-    public static synchronized void registerAppInfo(String prefix, String id) {
+    public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) {
         try {
             ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
             AppInfo mBean = new AppInfo();
             ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+
+            registerMetrics(metrics); // prefix will be added later by JmxReporter
         } catch (JMException e) {
             log.warn("Error registering AppInfo mbean", e);
         }
     }
 
-    public static synchronized void unregisterAppInfo(String prefix, String id) {
+    public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         try {
             ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
             if (server.isRegistered(name))
                 server.unregisterMBean(name);
+
+            unregisterMetrics(metrics);
         } catch (JMException e) {
             log.warn("Error unregistering AppInfo mbean", e);
         }
     }
 
+    private static MetricName metricName(Metrics metrics, String name) {
+        return metrics.metricName(name, "app-info", "Metric indicating " + name);
+    }
+
+    private static void registerMetrics(Metrics metrics) {
+        if (metrics != null) {
+            metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION));
+            metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID));
+        }
+    }
+
+    private static void unregisterMetrics(Metrics metrics) {
+        if (metrics != null) {
+            metrics.removeMetric(metricName(metrics, "version"));
+            metrics.removeMetric(metricName(metrics, "commit-id"));
+        }
+    }
+
     public interface AppInfoMBean {
         public String getVersion();
         public String getCommitId();
@@ -95,4 +121,17 @@ public class AppInfoParser {
         }
 
     }
+
+    static class ImmutableValue<T> implements Gauge<T> {
+        private final T value;
+
+        public ImmutableValue(T value) {
+            this.value = value;
+        }
+
+        @Override
+        public T value(MetricConfig config, long now) {
+            return value;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5263d3b..cab385c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -109,6 +109,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
     private String topicName = "test";

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index afccee0..3a92e63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -240,6 +240,7 @@ public class SenderTest {
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
     @Test
+    @SuppressWarnings("deprecation")
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
         Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1619,6 +1620,7 @@ public class SenderTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1711,6 +1713,7 @@ public class SenderTest {
         testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
     }
 
+    @SuppressWarnings("deprecation")
     private void testSplitBatchAndSend(TransactionManager txnManager,
                                        ProducerIdAndEpoch producerIdAndEpoch,
                                        TopicPartition tp) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e24c3d7..55f8e23 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -46,6 +46,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class MetricsTest {
 
     private static final double EPS = 0.000001;

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9db18e2..9b6f686 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -102,6 +102,7 @@ public class FrequenciesTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testUseWithMetrics() {
         MetricName name1 = name("1");
         MetricName name2 = name("2");
@@ -156,4 +157,4 @@ public class FrequenciesTest {
         return new Frequency(name(name), value);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 190fa3d..8d510f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -16,7 +16,11 @@
  */
 package org.apache.kafka.common.network;
 
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -24,6 +28,8 @@ import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramMechanism;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -36,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
@@ -43,6 +50,9 @@ import java.util.List;
  *
  */
 public class NioEchoServer extends Thread {
+
+    private static final double EPS = 0.0001;
+
     private final int port;
     private final ServerSocketChannel serverSocketChannel;
     private final List<SocketChannel> newChannels;
@@ -51,6 +61,7 @@ public class NioEchoServer extends Thread {
     private final Selector selector;
     private volatile WritableByteChannel outputChannel;
     private final CredentialCache credentialCache;
+    private final Metrics metrics;
 
     public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
             String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -67,7 +78,8 @@ public class NioEchoServer extends Thread {
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)
             channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+        this.metrics = new Metrics();
+        this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
     }
 
@@ -79,6 +91,43 @@ public class NioEchoServer extends Thread {
         return credentialCache;
     }
 
+    @SuppressWarnings("deprecation")
+    public double metricValue(String name) {
+        for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
+            if (entry.getKey().name().equals(name))
+                return entry.getValue().value();
+        }
+        throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
+    }
+
+    public void verifyAuthenticationMetrics(int successfulAuthentications, final int failedAuthentications)
+            throws InterruptedException {
+        waitForMetric("successful-authentication", successfulAuthentications);
+        waitForMetric("failed-authentication", failedAuthentications);
+    }
+
+    private void waitForMetric(String name, final double expectedValue) throws InterruptedException {
+        final String totalName = name + "-total";
+        final String rateName = name + "-rate";
+        if (expectedValue == 0.0) {
+            assertEquals(expectedValue, metricValue(totalName), EPS);
+            assertEquals(expectedValue, metricValue(rateName), EPS);
+        } else {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return Math.abs(metricValue(totalName) - expectedValue) <= EPS;
+                }
+            }, "Metric not updated " + totalName);
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return metricValue(rateName) > 0.0;
+                }
+            }, "Metric not updated " + rateName);
+        }
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8894873..be4dbc7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -506,7 +506,7 @@ public class SelectorTest {
         // record void method invocations
         kafkaChannel.disconnect();
         kafkaChannel.close();
-        expect(kafkaChannel.ready()).andReturn(false);
+        expect(kafkaChannel.ready()).andReturn(false).anyTimes();
         // prepare throws an exception
         kafkaChannel.prepare();
         expectLastCall().andThrow(new IOException());

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 90c8cd5..440140b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -105,6 +105,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -230,6 +231,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -323,6 +325,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -343,6 +346,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -495,6 +499,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -512,6 +517,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -530,6 +536,7 @@ public class SslTransportLayerTest {
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index b41db67..6df7c2d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -47,10 +49,12 @@ public class FileRecordsTest {
             "ijkl".getBytes()
     };
     private FileRecords fileRecords;
+    private Time time;
 
     @Before
     public void setup() throws IOException {
         this.fileRecords = createFileRecords(values);
+        this.time = new MockTime();
     }
 
     /**
@@ -310,7 +314,7 @@ public class FileRecordsTest {
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
         int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
+        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
         assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
     }
@@ -362,7 +366,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
+            Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
             verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
 
             if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
@@ -371,7 +375,7 @@ public class FileRecordsTest {
                     firstOffset = 11L; // v1 record
                 else
                     firstOffset = 17; // v2 record
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset);
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
                 List<Long> filteredOffsets = new ArrayList<>(offsets);
                 List<SimpleRecord> filteredRecords = new ArrayList<>(records);
                 int index = filteredOffsets.indexOf(firstOffset) - 1;
@@ -380,7 +384,7 @@ public class FileRecordsTest {
                 verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
             } else {
                 // firstOffset doesn't have any effect in this case
-                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L);
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
                 verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index cc2bf79..4df3492 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -38,10 +40,12 @@ public class MemoryRecordsBuilderTest {
 
     private final CompressionType compressionType;
     private final int bufferOffset;
+    private final Time time;
 
     public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
         this.bufferOffset = bufferOffset;
         this.compressionType = compressionType;
+        this.time = Time.SYSTEM;
     }
 
     @Test
@@ -456,7 +460,11 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+        ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+                .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+        MemoryRecords records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+                3, 2, records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -493,7 +501,11 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+        ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+                .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+        MemoryRecords records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+                records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -517,7 +529,10 @@ public class MemoryRecordsBuilderTest {
         assertEquals("2", utf8(logRecords.get(1).key()));
         assertEquals("3", utf8(logRecords.get(2).key()));
 
-        records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L);
+        convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time);
+        records = convertedRecords.records();
+        verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+                records.sizeInBytes(), buffer.limit());
 
         batches = Utils.toList(records.batches().iterator());
         logRecords = Utils.toList(records.records().iterator());
@@ -619,4 +634,26 @@ public class MemoryRecordsBuilderTest {
         return values;
     }
 
+    private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int recordCount, int convertedCount,
+            long finalBytes, long preConvertedBytes) {
+        assertNotNull("Records processing info is null", processingStats);
+        assertEquals(convertedCount, processingStats.conversionCount());
+        assertTrue("Processing time not recorded", processingStats.conversionTimeNanos() > 0);
+        long tempBytes = processingStats.temporaryMemoryBytes();
+        if (compressionType == CompressionType.NONE) {
+            if (convertedCount == 0)
+                assertEquals(finalBytes, tempBytes);
+            else if (convertedCount == recordCount)
+                assertEquals(preConvertedBytes + finalBytes, tempBytes);
+            else {
+                assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes),
+                        tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes);
+            }
+        } else {
+            long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
+            assertTrue(String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes),
+                    tempBytes > compressedBytes);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c59f2c9..c2c8d81 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -126,6 +126,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -139,6 +140,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnection(securityProtocol, node);
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -153,6 +155,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -167,6 +170,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -268,6 +272,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnection(securityProtocol, "0");
+        server.verifyAuthenticationMetrics(1, 0);
     }
 
     /**
@@ -303,6 +308,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -321,6 +327,7 @@ public class SaslAuthenticatorTest {
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -338,9 +345,11 @@ public class SaslAuthenticatorTest {
         String node = "1";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
 
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
         createAndCheckClientConnection(securityProtocol, "2");
+        server.verifyAuthenticationMetrics(1, 1);
     }
 
     /**
@@ -643,6 +652,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**
@@ -657,6 +667,7 @@ public class SaslAuthenticatorTest {
 
         server = createEchoServer(securityProtocol);
         createAndCheckClientConnectionFailure(securityProtocol, node);
+        server.verifyAuthenticationMetrics(0, 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 9d8be8d..4abff84 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.security.scram.ScramMechanism.SCRAM_SHA_256;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SaslServerAuthenticatorTest {
@@ -100,8 +99,8 @@ public class SaslServerAuthenticatorTest {
         try {
             authenticator.authenticate();
             fail("Expected authenticate() to raise an exception");
-        } catch (IOException e) {
-            assertTrue(e.getCause() instanceof IllegalSaslStateException);
+        } catch (IllegalSaslStateException e) {
+            // expected exception
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 8c8ba6d..eb01337 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -73,7 +73,7 @@ public class ConnectMetrics {
         reporters.add(new JmxReporter(JMX_PREFIX));
         this.metrics = new Metrics(metricConfig, reporters, time);
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
-        AppInfoParser.registerAppInfo(JMX_PREFIX, workerId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics);
     }
 
     /**
@@ -164,7 +164,7 @@ public class ConnectMetrics {
     public void stop() {
         metrics.close();
         LOG.debug("Unregistering Connect metrics with JMX for worker '{}'", workerId);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId, metrics);
     }
 
     public static class MetricGroupId {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d61a29c..6e19d11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -133,7 +133,7 @@ public class WorkerGroupMember {
                     configStorage,
                     listener);
 
-            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
             log.debug("Connect group member created");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
@@ -200,7 +200,7 @@ public class WorkerGroupMember {
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to stop the Connect group member", firstException.get());
         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index bd193c7..306d64a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,7 @@ import joptsimple._
 import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.security.JaasUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5db2c5d..4ea61ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -964,12 +964,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   private def registerLogDirEventNotificationListener() = {
     debug("Registering logDirEventNotificationListener")
-    zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+    zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
   }
 
   private def deregisterLogDirEventNotificationListener() = {
     debug("De-registering logDirEventNotificationListener")
-    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+    zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
   }
 
   private def readControllerEpochFromZookeeper() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f32da4b..e6c774a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
-    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
     LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+      RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -65,6 +65,7 @@ object LogAppendInfo {
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
  * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param logStartOffset The start offset of the log at the time of this append.
+ * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -77,6 +78,7 @@ case class LogAppendInfo(var firstOffset: Long,
                          var offsetOfMaxTimestamp: Long,
                          var logAppendTime: Long,
                          var logStartOffset: Long,
+                         var recordsProcessingStats: RecordsProcessingStats,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -617,6 +619,7 @@ class Log(@volatile var dir: File,
           val validateAndOffsetAssignResult = try {
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
               offset,
+              time,
               now,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
@@ -633,6 +636,7 @@ class Log(@volatile var dir: File,
           appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
           appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
+          appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.logAppendTime = now
 
@@ -868,8 +872,8 @@ class Log(@volatile var dir: File,
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec,
-      targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
+      RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   private def updateProducers(batch: RecordBatch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b6b20e3..15750e9 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -23,6 +23,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -46,6 +47,7 @@ private[kafka] object LogValidator extends Logging {
    */
   private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
                                                       offsetCounter: LongRef,
+                                                      time: Time,
                                                       now: Long,
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
@@ -58,14 +60,14 @@ private[kafka] object LogValidator extends Logging {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(magic))
-        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
           timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
           partitionLeaderEpoch, isFromClient, magic)
     } else {
-      validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
+      validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
         magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
     }
   }
@@ -109,6 +111,7 @@ private[kafka] object LogValidator extends Logging {
   private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
                                                    offsetCounter: LongRef,
                                                    compactedTopic: Boolean,
+                                                   time: Time,
                                                    now: Long,
                                                    timestampType: TimestampType,
                                                    timestampDiffMaxMs: Long,
@@ -137,12 +140,16 @@ private[kafka] object LogValidator extends Logging {
     }
 
     val convertedRecords = builder.build()
+
     val info = builder.info
+    val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+      builder.numRecords, time.nanoseconds - now)
     ValidationAndOffsetAssignResult(
       validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-      messageSizeMaybeChanged = true)
+      messageSizeMaybeChanged = true,
+      recordsProcessingStats = recordsProcessingStats)
   }
 
   private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -203,7 +210,8 @@ private[kafka] object LogValidator extends Logging {
       validatedRecords = records,
       maxTimestamp = maxTimestamp,
       shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
-      messageSizeMaybeChanged = false)
+      messageSizeMaybeChanged = false,
+      recordsProcessingStats = RecordsProcessingStats.EMPTY)
   }
 
   /**
@@ -215,6 +223,7 @@ private[kafka] object LogValidator extends Logging {
    */
   def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
                                                  offsetCounter: LongRef,
+                                                 time: Time,
                                                  now: Long,
                                                  sourceCodec: CompressionCodec,
                                                  targetCodec: CompressionCodec,
@@ -232,8 +241,11 @@ private[kafka] object LogValidator extends Logging {
       val expectedInnerOffset = new LongRef(0)
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
+      var uncompressedSizeInBytes = 0
+
       for (batch <- records.batches.asScala) {
         validateBatch(batch, isFromClient, toMagic)
+        uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
 
         // Do not compress control records unless they are written compressed
         if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
@@ -244,6 +256,8 @@ private[kafka] object LogValidator extends Logging {
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
             throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
               s"compression attribute set: $record")
+
+          uncompressedSizeInBytes += record.sizeInBytes()
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
             // Check if we need to overwrite offset
             // No in place assignment situation 3
@@ -269,8 +283,9 @@ private[kafka] object LogValidator extends Logging {
           val first = records.batches.asScala.head
           (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
         }
-        buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
-          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
+        buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+          validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+          uncompressedSizeInBytes)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -287,15 +302,18 @@ private[kafka] object LogValidator extends Logging {
         if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
           batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
+        val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
         ValidationAndOffsetAssignResult(validatedRecords = records,
           maxTimestamp = maxTimestamp,
           shallowOffsetOfMaxTimestamp = lastOffset,
-          messageSizeMaybeChanged = false)
+          messageSizeMaybeChanged = false,
+          recordsProcessingStats = recordsProcessingStats)
       }
   }
 
   private def buildRecordsAndAssignOffsets(magic: Byte,
                                            offsetCounter: LongRef,
+                                           time: Time,
                                            timestampType: TimestampType,
                                            compressionType: CompressionType,
                                            logAppendTime: Long,
@@ -304,7 +322,10 @@ private[kafka] object LogValidator extends Logging {
                                            producerEpoch: Short,
                                            baseSequence: Int,
                                            isTransactional: Boolean,
-                                           partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+                                           partitionLeaderEpoch: Int,
+                                           isFromClient: Boolean,
+                                           uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
+    val startNanos = time.nanoseconds
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
       validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
@@ -316,13 +337,23 @@ private[kafka] object LogValidator extends Logging {
     }
 
     val records = builder.build()
+
     val info = builder.info
 
+    // This is not strictly correct, it represents the number of records where in-place assignment is not possible
+    // instead of the number of records that were converted. It will over-count cases where the source and target are
+    // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
+    // to rebuild the records (including recompression if enabled).
+    val conversionCount = builder.numRecords
+    val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+      conversionCount, time.nanoseconds - startNanos)
+
     ValidationAndOffsetAssignResult(
       validatedRecords = records,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-      messageSizeMaybeChanged = true)
+      messageSizeMaybeChanged = true,
+      recordsProcessingStats = recordsProcessingStats)
   }
 
   private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -352,6 +383,7 @@ private[kafka] object LogValidator extends Logging {
   case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
                                              maxTimestamp: Long,
                                              shallowOffsetOfMaxTimestamp: Long,
-                                             messageSizeMaybeChanged: Boolean)
+                                             messageSizeMaybeChanged: Boolean,
+                                             recordsProcessingStats: RecordsProcessingStats)
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 1894213..b9ab486 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,19 +40,22 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
+  protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
-    def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
-    val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
 
-    explicitMetricName(pkg, simpleName, name, metricTags)
+    explicitMetricName(pkg, simpleName, name, tags)
   }
 
 
-  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
+  protected def explicitMetricName(group: String, typeName: String, name: String,
+      tags: scala.collection.Map[String, String]): MetricName = {
+
+    // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
+    def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
+    val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
+
     val nameBuilder: StringBuilder = new StringBuilder
 
     nameBuilder.append(group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index b2ef615..e5f115c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,19 +21,20 @@ import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent._
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.apache.log4j.Logger
 
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
@@ -60,6 +61,8 @@ object RequestChannel extends Logging {
     @volatile var responseCompleteTimeNanos = -1L
     @volatile var responseDequeueTimeNanos = -1L
     @volatile var apiRemoteCompleteTimeNanos = -1L
+    @volatile var messageConversionsTimeNanos = 0L
+    @volatile var temporaryMemoryBytes = 0L
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
@@ -121,6 +124,7 @@ object RequestChannel extends Logging {
       val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
       val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
       val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+      val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
       val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (header.apiKey == ApiKeys.FETCH) {
@@ -133,7 +137,7 @@ object RequestChannel extends Logging {
         else Seq.empty
       val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
-        val m = RequestMetrics.metricsMap(metricName)
+        val m = RequestMetrics(metricName)
         m.requestRate.mark()
         m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
         m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -142,6 +146,9 @@ object RequestChannel extends Logging {
         m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
         m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
         m.totalTimeHist.update(Math.round(totalTimeMs))
+        m.requestBytesHist.update(sizeOfBodyInBytes)
+        m.messageConversionsTimeHist.foreach(_.update(Math.round(messageConversionsTimeMs)))
+        m.tempMemoryBytesHist.foreach(_.update(temporaryMemoryBytes))
       }
 
       // Records network handler thread usage. This is included towards the request quota for the
@@ -171,6 +178,10 @@ object RequestChannel extends Logging {
           .append(",securityProtocol:").append(context.securityProtocol)
           .append(",principal:").append(session.principal)
           .append(",listener:").append(context.listenerName.value)
+        if (temporaryMemoryBytes > 0)
+          builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes)
+        if (messageConversionsTimeMs > 0)
+          builder.append(",messageConversionsTime:").append(messageConversionsTimeMs)
         requestLogger.debug(builder.toString)
       }
     }
@@ -281,6 +292,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     responseListeners ::= onResponse
   }
 
+  def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
+    errors.foreach { case (error, count) =>
+      RequestMetrics.markErrorMeter(apiKey.name, error, count)
+    }
+  }
+
   def shutdown() {
     requestQueue.clear()
   }
@@ -290,11 +307,30 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 }
 
 object RequestMetrics {
-  val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+
+  private val metricsMap = mutable.Map[String, RequestMetrics]()
+
   val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
   val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
-  (ApiKeys.values().toList.map(e => e.name)
-    ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+
+  (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
+    metricsMap.put(name, new RequestMetrics(name))
+  }
+
+  def apply(metricName: String) = metricsMap(metricName)
+
+  def markErrorMeter(name: String, error: Errors, count: Int) {
+      val errorMeter = metricsMap(name).errorMeters(error)
+      errorMeter.getOrCreateMeter().mark(count.toLong)
+  }
+
+  // Used for testing until these metrics are moved to a class
+  private[kafka] def clearErrorMeters(): Unit = {
+    metricsMap.values.foreach { requestMetrics =>
+      requestMetrics.errorMeters.values.foreach(_.removeMeter())
+    }
+  }
+
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
@@ -306,11 +342,58 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
   // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
   val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
-  // time a request is throttled (only relevant to fetch and produce requests)
+  // time a request is throttled
   val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
   // time a response spent in a response queue
   val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
   // time to send the response to the requester
   val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
   val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+  // request size in bytes
+  val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+  // time for message conversions (only relevant to fetch and produce requests)
+  val messageConversionsTimeHist =
+    if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+      Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+    else
+      None
+  // Temporary memory allocated for processing request (only populated for fetch and produce requests)
+  // This shows the memory allocated for compression/conversions excluding the actual request size
+  val tempMemoryBytesHist =
+    if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+      Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+    else
+      None
+
+  private val errorMeters = mutable.Map[Errors, ErrorMeter]()
+  Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
+
+  class ErrorMeter(name: String, error: Errors) {
+    private val tags = Map("request" -> name, "error" -> error.name)
+
+    @volatile private var meter: Meter = null
+
+    def getOrCreateMeter(): Meter = {
+      if (meter != null)
+        meter
+      else {
+        synchronized {
+          if (meter == null)
+             meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+          meter
+        }
+      }
+    }
+
+    // This is currently used only in tests.
+    def removeMeter(): Unit = {
+      synchronized {
+        if (meter != null) {
+          removeMetric("ErrorsPerSec", tags)
+          meter = null
+        }
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index fd31fc4..2c83c1b 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -25,7 +25,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c84fbcb..afaa5dd 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,8 +16,6 @@
  */
 package kafka.server
 
-import java.net.{URLEncoder, URLDecoder}
-import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock