You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 20:59:23 UTC
[2/3] kafka git commit: KAFKA-5746;
Add new metrics to support health checks (KIP-188)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 8799ad7..9ecb21f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -88,6 +89,11 @@ public class InitProducerIdResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public short epoch() {
return epoch;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a4431b9..56491eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -147,6 +147,11 @@ public class JoinGroupResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public int generationId() {
return generationId;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 39b8c37..921c8ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -87,6 +87,11 @@ public class LeaderAndIsrResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
return new LeaderAndIsrResponse(ApiKeys.LEADER_AND_ISR.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index bef21e9..f8682ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -73,6 +74,11 @@ public class LeaveGroupResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.LEAVE_GROUP.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 8f48f39..afc5ebd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -97,6 +98,11 @@ public class ListGroupsResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static class Group {
private final String groupId;
private final String protocolType;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 732fb49..13f2dfb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -191,6 +191,14 @@ public class ListOffsetResponse extends AbstractResponse {
return responseData;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionData response : responseData.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
return new ListOffsetResponse(ApiKeys.LIST_OFFSETS.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index fb69cef..99c4ffb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -294,6 +294,14 @@ public class MetadataResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (TopicMetadata metadata : topicMetadata)
+ updateErrorCounts(errorCounts, metadata.error);
+ return errorCounts;
+ }
+
/**
* Returns the set of topics with the specified error
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 13484ed..b439034 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -145,6 +145,11 @@ public class OffsetCommitResponse extends AbstractResponse {
return responseData;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(responseData);
+ }
+
public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index c3341e0..4d069fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -189,6 +189,11 @@ public class OffsetFetchResponse extends AbstractResponse {
return this.error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public Map<TopicPartition, PartitionData> responseData() {
return responseData;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 13d70b7..4a91533 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -82,6 +82,14 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
return epochEndOffsetsByPartition;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (EpochEndOffset response : epochEndOffsetsByPartition.values())
+ updateErrorCounts(errorCounts, response.error());
+ return errorCounts;
+ }
+
public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index fbc7f76..ee4a2e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -334,6 +335,12 @@ public class ProduceRequest extends AbstractRequest {
}
}
+ @Override
+ public Map<Errors, Integer> errorCounts(Throwable e) {
+ Errors error = Errors.forException(e);
+ return Collections.singletonMap(error, partitions().size());
+ }
+
private Collection<TopicPartition> partitions() {
return partitionSizes.keySet();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index e1978dd..afedc9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -233,6 +233,14 @@ public class ProduceResponse extends AbstractResponse {
return this.throttleTime;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (PartitionResponse response : responses.values())
+ updateErrorCounts(errorCounts, response.error);
+ return errorCounts;
+ }
+
public static final class PartitionResponse {
public Errors error;
public long baseOffset;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index c950cb9..e244182 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
@@ -85,6 +86,11 @@ public class SaslAuthenticateResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
struct.set(ERROR_CODE, error.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c9f6369..252c62d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -76,6 +77,11 @@ public class SaslHandshakeResponse extends AbstractResponse {
}
@Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
+ @Override
public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
struct.set(ERROR_CODE, error.code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 4196b83..8ad6222 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -84,6 +84,11 @@ public class StopReplicaResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static StopReplicaResponse parse(ByteBuffer buffer, short version) {
return new StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index d68b2cd..77f9512 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
@@ -82,6 +83,11 @@ public class SyncGroupResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public ByteBuffer memberAssignment() {
return memberState;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 53804d9..ff0f8ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -125,6 +125,11 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
return errors;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(errors);
+ }
+
public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 9ff8e27..4c21cde 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -56,6 +57,11 @@ public class UpdateMetadataResponse extends AbstractResponse {
return error;
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(error);
+ }
+
public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) {
return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 797fb59..f4bf157 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -150,6 +150,16 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
return errors.get(producerId);
}
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Map<TopicPartition, Errors> allErrors : errors.values()) {
+ for (Errors error : allErrors.values())
+ updateErrorCounts(errorCounts, error);
+ }
+ return errorCounts;
+ }
+
public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index fe57d27..739e0cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslServerAuthenticator implements Authenticator {
// Next SASL state to be set when outgoing writes associated with the current SASL state complete
private SaslState pendingSaslState = null;
// Exception that will be thrown by `authenticate()` when SaslState is set to FAILED after outbound writes complete
- private IOException pendingException = null;
+ private AuthenticationException pendingException = null;
private SaslServer saslServer;
private String saslMechanism;
private AuthCallbackHandler callbackHandler;
@@ -272,8 +272,15 @@ public class SaslServerAuthenticator implements Authenticator {
default:
break;
}
+ } catch (SaslException | AuthenticationException e) {
+ // Exception will be propagated after response is sent to client
+ AuthenticationException authException = (e instanceof AuthenticationException) ?
+ (AuthenticationException) e : new AuthenticationException("SASL authentication failed", e);
+ setSaslState(SaslState.FAILED, authException);
} catch (Exception e) {
- setSaslState(SaslState.FAILED, new IOException(e));
+ // In the case of IOExceptions and other unexpected exceptions, fail immediately
+ saslState = SaslState.FAILED;
+ throw e;
}
}
}
@@ -303,7 +310,7 @@ public class SaslServerAuthenticator implements Authenticator {
setSaslState(saslState, null);
}
- private void setSaslState(SaslState saslState, IOException exception) throws IOException {
+ private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
if (netOutBuffer != null && !netOutBuffer.completed()) {
pendingSaslState = saslState;
pendingException = exception;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 25e09e1..6053da6 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -24,6 +24,10 @@ import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,27 +55,49 @@ public class AppInfoParser {
return COMMIT_ID;
}
- public static synchronized void registerAppInfo(String prefix, String id) {
+ public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) {
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
AppInfo mBean = new AppInfo();
ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+
+ registerMetrics(metrics); // prefix will be added later by JmxReporter
} catch (JMException e) {
log.warn("Error registering AppInfo mbean", e);
}
}
- public static synchronized void unregisterAppInfo(String prefix, String id) {
+ public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
if (server.isRegistered(name))
server.unregisterMBean(name);
+
+ unregisterMetrics(metrics);
} catch (JMException e) {
log.warn("Error unregistering AppInfo mbean", e);
}
}
+ private static MetricName metricName(Metrics metrics, String name) {
+ return metrics.metricName(name, "app-info", "Metric indicating " + name);
+ }
+
+ private static void registerMetrics(Metrics metrics) {
+ if (metrics != null) {
+ metrics.addMetric(metricName(metrics, "version"), new ImmutableValue<>(VERSION));
+ metrics.addMetric(metricName(metrics, "commit-id"), new ImmutableValue<>(COMMIT_ID));
+ }
+ }
+
+ private static void unregisterMetrics(Metrics metrics) {
+ if (metrics != null) {
+ metrics.removeMetric(metricName(metrics, "version"));
+ metrics.removeMetric(metricName(metrics, "commit-id"));
+ }
+ }
+
public interface AppInfoMBean {
public String getVersion();
public String getCommitId();
@@ -95,4 +121,17 @@ public class AppInfoParser {
}
}
+
+ static class ImmutableValue<T> implements Gauge<T> {
+ private final T value;
+
+ public ImmutableValue(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T value(MetricConfig config, long now) {
+ return value;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5263d3b..cab385c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -109,6 +109,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class FetcherTest {
private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
private String topicName = "test";
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index afccee0..3a92e63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -240,6 +240,7 @@ public class SenderTest {
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@Test
+ @SuppressWarnings("deprecation")
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1619,6 +1620,7 @@ public class SenderTest {
}
@Test
+ @SuppressWarnings("deprecation")
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
@@ -1711,6 +1713,7 @@ public class SenderTest {
testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
}
+ @SuppressWarnings("deprecation")
private void testSplitBatchAndSend(TransactionManager txnManager,
ProducerIdAndEpoch producerIdAndEpoch,
TopicPartition tp) throws Exception {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e24c3d7..55f8e23 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -46,6 +46,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class MetricsTest {
private static final double EPS = 0.000001;
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9db18e2..9b6f686 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -102,6 +102,7 @@ public class FrequenciesTest {
}
@Test
+ @SuppressWarnings("deprecation")
public void testUseWithMetrics() {
MetricName name1 = name("1");
MetricName name2 = name("2");
@@ -156,4 +157,4 @@ public class FrequenciesTest {
return new Frequency(name(name), value);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 190fa3d..8d510f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -16,7 +16,11 @@
*/
package org.apache.kafka.common.network;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -24,6 +28,8 @@ import org.apache.kafka.common.security.scram.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -36,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
@@ -43,6 +50,9 @@ import java.util.List;
*
*/
public class NioEchoServer extends Thread {
+
+ private static final double EPS = 0.0001;
+
private final int port;
private final ServerSocketChannel serverSocketChannel;
private final List<SocketChannel> newChannels;
@@ -51,6 +61,7 @@ public class NioEchoServer extends Thread {
private final Selector selector;
private volatile WritableByteChannel outputChannel;
private final CredentialCache credentialCache;
+ private final Metrics metrics;
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
@@ -67,7 +78,8 @@ public class NioEchoServer extends Thread {
ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
- this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+ this.metrics = new Metrics();
+ this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
}
@@ -79,6 +91,43 @@ public class NioEchoServer extends Thread {
return credentialCache;
}
+ @SuppressWarnings("deprecation")
+ public double metricValue(String name) {
+ for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
+ if (entry.getKey().name().equals(name))
+ return entry.getValue().value();
+ }
+ throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
+ }
+
+ public void verifyAuthenticationMetrics(int successfulAuthentications, final int failedAuthentications)
+ throws InterruptedException {
+ waitForMetric("successful-authentication", successfulAuthentications);
+ waitForMetric("failed-authentication", failedAuthentications);
+ }
+
+ private void waitForMetric(String name, final double expectedValue) throws InterruptedException {
+ final String totalName = name + "-total";
+ final String rateName = name + "-rate";
+ if (expectedValue == 0.0) {
+ assertEquals(expectedValue, metricValue(totalName), EPS);
+ assertEquals(expectedValue, metricValue(rateName), EPS);
+ } else {
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return Math.abs(metricValue(totalName) - expectedValue) <= EPS;
+ }
+ }, "Metric not updated " + totalName);
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return metricValue(rateName) > 0.0;
+ }
+ }, "Metric not updated " + rateName);
+ }
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 8894873..be4dbc7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -506,7 +506,7 @@ public class SelectorTest {
// record void method invocations
kafkaChannel.disconnect();
kafkaChannel.close();
- expect(kafkaChannel.ready()).andReturn(false);
+ expect(kafkaChannel.ready()).andReturn(false).anyTimes();
// prepare throws an exception
kafkaChannel.prepare();
expectLastCall().andThrow(new IOException());
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 90c8cd5..440140b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -105,6 +105,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -230,6 +231,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -323,6 +325,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -343,6 +346,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -495,6 +499,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -512,6 +517,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -530,6 +536,7 @@ public class SslTransportLayerTest {
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index b41db67..6df7c2d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
@@ -47,10 +49,12 @@ public class FileRecordsTest {
"ijkl".getBytes()
};
private FileRecords fileRecords;
+ private Time time;
@Before
public void setup() throws IOException {
this.fileRecords = createFileRecords(values);
+ this.time = new MockTime();
}
/**
@@ -310,7 +314,7 @@ public class FileRecordsTest {
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int size = batch.sizeInBytes();
FileRecords slice = fileRecords.read(start, size - 1);
- Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
+ Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
assertTrue("No message should be there", batches(messageV0).isEmpty());
assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
}
@@ -362,7 +366,7 @@ public class FileRecordsTest {
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
- Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
+ Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
@@ -371,7 +375,7 @@ public class FileRecordsTest {
firstOffset = 11L; // v1 record
else
firstOffset = 17; // v2 record
- Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset);
+ Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
List<Long> filteredOffsets = new ArrayList<>(offsets);
List<SimpleRecord> filteredRecords = new ArrayList<>(records);
int index = filteredOffsets.indexOf(firstOffset) - 1;
@@ -380,7 +384,7 @@ public class FileRecordsTest {
verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
} else {
// firstOffset doesn't have any effect in this case
- Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L);
+ Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index cc2bf79..4df3492 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -38,10 +40,12 @@ public class MemoryRecordsBuilderTest {
private final CompressionType compressionType;
private final int bufferOffset;
+ private final Time time;
public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
this.bufferOffset = bufferOffset;
this.compressionType = compressionType;
+ this.time = Time.SYSTEM;
}
@Test
@@ -456,7 +460,11 @@ public class MemoryRecordsBuilderTest {
buffer.flip();
- Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+ ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+ .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+ MemoryRecords records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+ 3, 2, records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
@@ -493,7 +501,11 @@ public class MemoryRecordsBuilderTest {
buffer.flip();
- Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0);
+ ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
+ .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
+ MemoryRecords records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+ records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
@@ -517,7 +529,10 @@ public class MemoryRecordsBuilderTest {
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
- records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L);
+ convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time);
+ records = convertedRecords.records();
+ verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+ records.sizeInBytes(), buffer.limit());
batches = Utils.toList(records.batches().iterator());
logRecords = Utils.toList(records.records().iterator());
@@ -619,4 +634,26 @@ public class MemoryRecordsBuilderTest {
return values;
}
+ private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int recordCount, int convertedCount,
+ long finalBytes, long preConvertedBytes) {
+ assertNotNull("Records processing info is null", processingStats);
+ assertEquals(convertedCount, processingStats.conversionCount());
+ assertTrue("Processing time not recorded", processingStats.conversionTimeNanos() > 0);
+ long tempBytes = processingStats.temporaryMemoryBytes();
+ if (compressionType == CompressionType.NONE) {
+ if (convertedCount == 0)
+ assertEquals(finalBytes, tempBytes);
+ else if (convertedCount == recordCount)
+ assertEquals(preConvertedBytes + finalBytes, tempBytes);
+ else {
+ assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes),
+ tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes);
+ }
+ } else {
+ long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
+ assertTrue(String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes),
+ tempBytes > compressedBytes);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c59f2c9..c2c8d81 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -126,6 +126,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -139,6 +140,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -153,6 +155,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -167,6 +170,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -268,6 +272,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnection(securityProtocol, "0");
+ server.verifyAuthenticationMetrics(1, 0);
}
/**
@@ -303,6 +308,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -321,6 +327,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -338,9 +345,11 @@ public class SaslAuthenticatorTest {
String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
createAndCheckClientConnection(securityProtocol, "2");
+ server.verifyAuthenticationMetrics(1, 1);
}
/**
@@ -643,6 +652,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
@@ -657,6 +667,7 @@ public class SaslAuthenticatorTest {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
+ server.verifyAuthenticationMetrics(0, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 9d8be8d..4abff84 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.security.scram.ScramMechanism.SCRAM_SHA_256;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SaslServerAuthenticatorTest {
@@ -100,8 +99,8 @@ public class SaslServerAuthenticatorTest {
try {
authenticator.authenticate();
fail("Expected authenticate() to raise an exception");
- } catch (IOException e) {
- assertTrue(e.getCause() instanceof IllegalSaslStateException);
+ } catch (IllegalSaslStateException e) {
+ // expected exception
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 8c8ba6d..eb01337 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -73,7 +73,7 @@ public class ConnectMetrics {
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
- AppInfoParser.registerAppInfo(JMX_PREFIX, workerId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics);
}
/**
@@ -164,7 +164,7 @@ public class ConnectMetrics {
public void stop() {
metrics.close();
LOG.debug("Unregistering Connect metrics with JMX for worker '{}'", workerId);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId, metrics);
}
public static class MetricGroupId {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d61a29c..6e19d11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -133,7 +133,7 @@ public class WorkerGroupMember {
configStorage,
listener);
- AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Connect group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
@@ -200,7 +200,7 @@ public class WorkerGroupMember {
ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to stop the Connect group member", firstException.get());
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index bd193c7..306d64a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,7 @@ import joptsimple._
import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.{CommandLineUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5db2c5d..4ea61ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -964,12 +964,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
private def registerLogDirEventNotificationListener() = {
debug("Registering logDirEventNotificationListener")
- zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+ zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
}
private def deregisterLogDirEventNotificationListener() = {
debug("De-registering logDirEventNotificationListener")
- zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+ zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
}
private def readControllerEpochFromZookeeper() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f32da4b..e6c774a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -65,6 +65,7 @@ object LogAppendInfo {
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
+ * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
@@ -77,6 +78,7 @@ case class LogAppendInfo(var firstOffset: Long,
var offsetOfMaxTimestamp: Long,
var logAppendTime: Long,
var logStartOffset: Long,
+ var recordsProcessingStats: RecordsProcessingStats,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
@@ -617,6 +619,7 @@ class Log(@volatile var dir: File,
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
+ time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
@@ -633,6 +636,7 @@ class Log(@volatile var dir: File,
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
+ appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
@@ -868,8 +872,8 @@ class Log(@volatile var dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
- LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec,
- targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
private def updateProducers(batch: RecordBatch,
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b6b20e3..15750e9 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -23,6 +23,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
import scala.collection.mutable
import scala.collection.JavaConverters._
@@ -46,6 +47,7 @@ private[kafka] object LogValidator extends Logging {
*/
private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
offsetCounter: LongRef,
+ time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
@@ -58,14 +60,14 @@ private[kafka] object LogValidator extends Logging {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
- convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+ convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, isFromClient, magic)
} else {
- validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
+ validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
}
}
@@ -109,6 +111,7 @@ private[kafka] object LogValidator extends Logging {
private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
compactedTopic: Boolean,
+ time: Time,
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
@@ -137,12 +140,16 @@ private[kafka] object LogValidator extends Logging {
}
val convertedRecords = builder.build()
+
val info = builder.info
+ val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+ builder.numRecords, time.nanoseconds - now)
ValidationAndOffsetAssignResult(
validatedRecords = convertedRecords,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
- messageSizeMaybeChanged = true)
+ messageSizeMaybeChanged = true,
+ recordsProcessingStats = recordsProcessingStats)
}
private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -203,7 +210,8 @@ private[kafka] object LogValidator extends Logging {
validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
- messageSizeMaybeChanged = false)
+ messageSizeMaybeChanged = false,
+ recordsProcessingStats = RecordsProcessingStats.EMPTY)
}
/**
@@ -215,6 +223,7 @@ private[kafka] object LogValidator extends Logging {
*/
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
offsetCounter: LongRef,
+ time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
@@ -232,8 +241,11 @@ private[kafka] object LogValidator extends Logging {
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
+ var uncompressedSizeInBytes = 0
+
for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, toMagic)
+ uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
@@ -244,6 +256,8 @@ private[kafka] object LogValidator extends Logging {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
+
+ uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
@@ -269,8 +283,9 @@ private[kafka] object LogValidator extends Logging {
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
- buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
- validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
+ buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now,
+ validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
+ uncompressedSizeInBytes)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -287,15 +302,18 @@ private[kafka] object LogValidator extends Logging {
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+ val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
- messageSizeMaybeChanged = false)
+ messageSizeMaybeChanged = false,
+ recordsProcessingStats = recordsProcessingStats)
}
}
private def buildRecordsAndAssignOffsets(magic: Byte,
offsetCounter: LongRef,
+ time: Time,
timestampType: TimestampType,
compressionType: CompressionType,
logAppendTime: Long,
@@ -304,7 +322,10 @@ private[kafka] object LogValidator extends Logging {
producerEpoch: Short,
baseSequence: Int,
isTransactional: Boolean,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean,
+ uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
+ val startNanos = time.nanoseconds
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
@@ -316,13 +337,23 @@ private[kafka] object LogValidator extends Logging {
}
val records = builder.build()
+
val info = builder.info
+ // This is not strictly correct, it represents the number of records where in-place assignment is not possible
+ // instead of the number of records that were converted. It will over-count cases where the source and target are
+ // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
+ // to rebuild the records (including recompression if enabled).
+ val conversionCount = builder.numRecords
+ val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+ conversionCount, time.nanoseconds - startNanos)
+
ValidationAndOffsetAssignResult(
validatedRecords = records,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
- messageSizeMaybeChanged = true)
+ messageSizeMaybeChanged = true,
+ recordsProcessingStats = recordsProcessingStats)
}
private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -352,6 +383,7 @@ private[kafka] object LogValidator extends Logging {
case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
maxTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
- messageSizeMaybeChanged: Boolean)
+ messageSizeMaybeChanged: Boolean,
+ recordsProcessingStats: RecordsProcessingStats)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 1894213..b9ab486 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,19 +40,22 @@ trait KafkaMetricsGroup extends Logging {
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
- private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
+ protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
- // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
- def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
- val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
- explicitMetricName(pkg, simpleName, name, metricTags)
+ explicitMetricName(pkg, simpleName, name, tags)
}
- private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
+ protected def explicitMetricName(group: String, typeName: String, name: String,
+ tags: scala.collection.Map[String, String]): MetricName = {
+
+ // Tags may contain ipv6 address with ':', which is not valid in JMX ObjectName
+ def quoteIfRequired(value: String) = if (value.contains(':')) ObjectName.quote(value) else value
+ val metricTags = tags.map(kv => (kv._1, quoteIfRequired(kv._2)))
+
val nameBuilder: StringBuilder = new StringBuilder
nameBuilder.append(group)
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index b2ef615..e5f115c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,19 +21,20 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Sanitizer
import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.log4j.Logger
+import scala.collection.mutable
import scala.reflect.ClassTag
object RequestChannel extends Logging {
@@ -60,6 +61,8 @@ object RequestChannel extends Logging {
@volatile var responseCompleteTimeNanos = -1L
@volatile var responseDequeueTimeNanos = -1L
@volatile var apiRemoteCompleteTimeNanos = -1L
+ @volatile var messageConversionsTimeNanos = 0L
+ @volatile var temporaryMemoryBytes = 0L
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
val session = Session(context.principal, context.clientAddress)
@@ -121,6 +124,7 @@ object RequestChannel extends Logging {
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+ val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
val fetchMetricNames =
if (header.apiKey == ApiKeys.FETCH) {
@@ -133,7 +137,7 @@ object RequestChannel extends Logging {
else Seq.empty
val metricNames = fetchMetricNames :+ header.apiKey.name
metricNames.foreach { metricName =>
- val m = RequestMetrics.metricsMap(metricName)
+ val m = RequestMetrics(metricName)
m.requestRate.mark()
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -142,6 +146,9 @@ object RequestChannel extends Logging {
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
m.totalTimeHist.update(Math.round(totalTimeMs))
+ m.requestBytesHist.update(sizeOfBodyInBytes)
+ m.messageConversionsTimeHist.foreach(_.update(Math.round(messageConversionsTimeMs)))
+ m.tempMemoryBytesHist.foreach(_.update(temporaryMemoryBytes))
}
// Records network handler thread usage. This is included towards the request quota for the
@@ -171,6 +178,10 @@ object RequestChannel extends Logging {
.append(",securityProtocol:").append(context.securityProtocol)
.append(",principal:").append(session.principal)
.append(",listener:").append(context.listenerName.value)
+ if (temporaryMemoryBytes > 0)
+ builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes)
+ if (messageConversionsTimeMs > 0)
+ builder.append(",messageConversionsTime:").append(messageConversionsTimeMs)
requestLogger.debug(builder.toString)
}
}
@@ -281,6 +292,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
responseListeners ::= onResponse
}
+ def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
+ errors.foreach { case (error, count) =>
+ RequestMetrics.markErrorMeter(apiKey.name, error, count)
+ }
+ }
+
def shutdown() {
requestQueue.clear()
}
@@ -290,11 +307,30 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
object RequestMetrics {
- val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+
+ private val metricsMap = mutable.Map[String, RequestMetrics]()
+
val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
- (ApiKeys.values().toList.map(e => e.name)
- ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+
+ (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
+ metricsMap.put(name, new RequestMetrics(name))
+ }
+
+ def apply(metricName: String) = metricsMap(metricName)
+
+ def markErrorMeter(name: String, error: Errors, count: Int) {
+ val errorMeter = metricsMap(name).errorMeters(error)
+ errorMeter.getOrCreateMeter().mark(count.toLong)
+ }
+
+ // Used for testing until these metrics are moved to a class
+ private[kafka] def clearErrorMeters(): Unit = {
+ metricsMap.values.foreach { requestMetrics =>
+ requestMetrics.errorMeters.values.foreach(_.removeMeter())
+ }
+ }
+
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
@@ -306,11 +342,58 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
// time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
- // time a request is throttled (only relevant to fetch and produce requests)
+ // time a request is throttled
val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
// time a response spent in a response queue
val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
// time to send the response to the requester
val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+ // request size in bytes
+ val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+ // time for message conversions (only relevant to fetch and produce requests)
+ val messageConversionsTimeHist =
+ if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+ Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+ else
+ None
+ // Temporary memory allocated for processing request (only populated for fetch and produce requests)
+ // This shows the memory allocated for compression/conversions excluding the actual request size
+ val tempMemoryBytesHist =
+ if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
+ Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+ else
+ None
+
+ private val errorMeters = mutable.Map[Errors, ErrorMeter]()
+ Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
+
+ class ErrorMeter(name: String, error: Errors) {
+ private val tags = Map("request" -> name, "error" -> error.name)
+
+ @volatile private var meter: Meter = null
+
+ def getOrCreateMeter(): Meter = {
+ if (meter != null)
+ meter
+ else {
+ synchronized {
+ if (meter == null)
+ meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+ meter
+ }
+ }
+ }
+
+ // This is currently used only in tests.
+ def removeMeter(): Unit = {
+ synchronized {
+ if (meter != null) {
+ removeMetric("ErrorsPerSec", tags)
+ meter = null
+ }
+ }
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index fd31fc4..2c83c1b 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -25,7 +25,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c84fbcb..afaa5dd 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,8 +16,6 @@
*/
package kafka.server
-import java.net.{URLEncoder, URLDecoder}
-import java.nio.charset.StandardCharsets
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock