You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/17 03:09:50 UTC
kafka git commit: KAFKA-1723;
(followup patch to fix javadoc for java 8) make the metrics name in
new producer more standard; patched by Manikumar Reddy; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.8.2 17c8bdcbb -> 988e695fa
KAFKA-1723; (followup patch to fix javadoc for java 8) make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/988e695f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/988e695f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/988e695f
Branch: refs/heads/0.8.2
Commit: 988e695fa294c5e9799509f6313b95e1f96682da
Parents: 17c8bdc
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Fri Jan 16 18:09:42 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 16 18:09:42 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 34 ++++++++++----------
.../kafka/clients/producer/MockProducer.java | 8 ++---
.../org/apache/kafka/common/MetricName.java | 24 ++++++++------
3 files changed, 36 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0bfda4b..30477d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -264,33 +264,33 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* sending the record.
* <p>
* If you want to simulate a simple blocking call you can do the following:
- *
- * <pre>
- * producer.send(new ProducerRecord("the-topic", "key, "value")).get();
- * </pre>
+ *
+ * <pre>{@code
+ * producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
+ * }</pre>
* <p>
* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
- *
- * <pre>
- * ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+ *
+ * <pre>{@code
+ * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
* producer.send(myRecord,
- * new Callback() {
+ * new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
* if(e != null)
* e.printStackTrace();
* System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
- * });
- * </pre>
- *
+ * });
+ * }</pre>
+ *
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
- *
- * <pre>
- * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
- * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
- * </pre>
+ *
+ * <pre>{@code
+ * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
+ * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
+ * }</pre>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
@@ -329,7 +329,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
- ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
+ ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
int partition = partitioner.partition(serializedRecord, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 21c25a9..cdca682 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -41,7 +41,7 @@ public class MockProducer implements Producer<byte[],byte[]> {
private final Cluster cluster;
private final Partitioner partitioner = new Partitioner();
- private final List<ProducerRecord> sent;
+ private final List<ProducerRecord<byte[], byte[]>> sent;
private final Deque<Completion> completions;
private boolean autoComplete;
private Map<TopicPartition, Long> offsets;
@@ -59,7 +59,7 @@ public class MockProducer implements Producer<byte[],byte[]> {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.offsets = new HashMap<TopicPartition, Long>();
- this.sent = new ArrayList<ProducerRecord>();
+ this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>();
this.completions = new ArrayDeque<Completion>();
}
@@ -144,8 +144,8 @@ public class MockProducer implements Producer<byte[],byte[]> {
/**
* Get the list of sent records since the last call to {@link #clear()}
*/
- public synchronized List<ProducerRecord> history() {
- return new ArrayList<ProducerRecord>(this.sent);
+ public synchronized List<ProducerRecord<byte[], byte[]>> history() {
+ return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 4e810d5..7e977e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -19,34 +19,40 @@ import org.apache.kafka.common.utils.Utils;
/**
* The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
- * <p/>
+ * <p>
* This class captures the following parameters
* <pre>
* <b>name</b> The name of the metric
* <b>group</b> logical group name of the metrics to which this metric belongs.
* <b>description</b> A human-readable description to include in the metric. This is optional.
* <b>tags</b> additional key/value attributes of the metric. This is optional.
- * </pre>
+ * </pre>
* group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
- *
+ * <p>
* Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b>
- *
+ * <p>
* Usage looks something like this:
- * <pre>
+ * <pre>{@code
* // set up metrics:
* Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
* Map<String, String> metricTags = new LinkedHashMap<String, String>();
* metricTags.put("client-id", "producer-1");
* metricTags.put("topic", "topic");
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ *
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
* sensor.add(metricName, new Avg());
- * metricName = new MetricName("message-size-max", "producer-metrics",metricTags);
+ *
+ * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
* sensor.add(metricName, new Max());
*
+ * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * sensor.add(metricName, new Min());
+ *
* // as messages are sent we record the sizes
* sensor.record(messageSize);
- * </pre>
+ * }</pre>
*/
public final class MetricName {