You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/13 23:17:24 UTC
kafka git commit: KAFKA-1863;
Add docs for possible thrown exception in Callback; reviewed by Jiangjie Qin
Repository: kafka
Updated Branches:
refs/heads/trunk c41c7b40b -> 1caaf6db4
KAFKA-1863; Add docs for possible thrown exception in Callback; reviewed by Jiangjie Qin
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1caaf6db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1caaf6db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1caaf6db
Branch: refs/heads/trunk
Commit: 1caaf6db400df7e37b7f0416bb83ab451018a5c8
Parents: c41c7b4
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Mar 13 15:17:08 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Mar 13 15:17:08 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/producer/Callback.java | 19 +++++++++++++++++
.../kafka/common/protocol/types/Field.java | 5 +++++
.../kafka/common/protocol/types/Schema.java | 5 +++++
.../common/protocol/types/SchemaException.java | 3 +++
.../kafka/common/protocol/types/Struct.java | 10 +++++++++
.../kafka/common/protocol/types/Type.java | 22 ++++++++++++++++++--
6 files changed, 62 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index b89aa58..7caefc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -29,6 +29,25 @@ public interface Callback {
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
+ * Possible thrown exceptions include:
+ *
+ * Non-Retriable exceptions (fatal, the message will never be sent):
+ *
+ * InvalidTopicException
+ * OffsetMetadataTooLargeException
+ * RecordBatchTooLargeException
+ * RecordTooLargeException
+ * UnknownServerException
+ *
+ * Retriable exceptions (transient, may be covered by increasing #.retries):
+ *
+ * CorruptRecordException
+ * InvalidMetadataException
+ * NotEnoughReplicasAfterAppendException
+ * NotEnoughReplicasException
+ * OffsetOutOfRangeException
+ * TimeoutException
+ * UnknownTopicOrPartitionException
*/
public void onCompletion(RecordMetadata metadata, Exception exception);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 8991958..1eb1195 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -30,6 +30,11 @@ public class Field {
public final String doc;
final Schema schema;
+ /**
+ * Create the field.
+ *
+ * @throws SchemaException If the default value is not primitive and the validation fails
+ */
public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
this.index = index;
this.name = name;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 7164701..3a14ac0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -24,6 +24,11 @@ public class Schema extends Type {
private final Field[] fields;
private final Map<String, Field> fieldsByName;
+ /**
+ * Construct the schema with a given list of its field values
+ *
+ * @throws SchemaException If the given list have duplicate fields
+ */
public Schema(Field... fs) {
this.fields = new Field[fs.length];
this.fieldsByName = new HashMap<String, Field>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java
index ea4e46f..58b685b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/SchemaException.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types;
import org.apache.kafka.common.KafkaException;
+/**
+ * Thrown if the protocol schema validation fails while parsing request or response.
+ */
public class SchemaException extends KafkaException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ff89f0e..7672a3a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -60,6 +60,7 @@ public class Struct {
*
* @param field The field to look up
* @return The value for that field.
+ * @throws SchemaException if the field has no value and has no default.
*/
public Object get(Field field) {
validateField(field);
@@ -71,6 +72,7 @@ public class Struct {
*
* @param name The name of the field
* @return The value in the record
+ * @throws SchemaException If no such field exists
*/
public Object get(String name) {
Field field = schema.get(name);
@@ -149,6 +151,7 @@ public class Struct {
*
* @param field The field
* @param value The value
+ * @throws SchemaException If the validation of the field failed
*/
public Struct set(Field field, Object value) {
validateField(field);
@@ -161,6 +164,7 @@ public class Struct {
*
* @param name The name of the field
* @param value The value to set
+ * @throws SchemaException If the field is not known
*/
public Struct set(String name, Object value) {
Field field = this.schema.get(name);
@@ -177,6 +181,7 @@ public class Struct {
*
* @param field The field to create an instance of
* @return The struct
+ * @throws SchemaException If the given field is not a container type
*/
public Struct instance(Field field) {
validateField(field);
@@ -195,6 +200,7 @@ public class Struct {
*
* @param field The name of the field to create (field must be a schema type)
* @return The struct
+ * @throws SchemaException If the given field is not a container type
*/
public Struct instance(String field) {
return instance(schema.get(field));
@@ -223,6 +229,8 @@ public class Struct {
/**
* Ensure the user doesn't try to access fields from the wrong schema
+ *
+ * @throws SchemaException If validation fails
*/
private void validateField(Field field) {
if (this.schema != field.schema)
@@ -233,6 +241,8 @@ public class Struct {
/**
* Validate the contents of this struct against its schema
+ *
+ * @throws SchemaException If validation fails
*/
public void validate() {
this.schema.validate(this);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caaf6db/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index f0d5a82..9ea28b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -25,14 +25,32 @@ import org.apache.kafka.common.utils.Utils;
*/
public abstract class Type {
+ /**
+ * Write the typed object to the buffer
+ *
+ * @throws SchemaException If the object is not valid for its type
+ */
public abstract void write(ByteBuffer buffer, Object o);
+ /**
+ * Read the typed object from the buffer
+ *
+ * @throws SchemaException If the object is not valid for its type
+ */
public abstract Object read(ByteBuffer buffer);
- public abstract int sizeOf(Object o);
-
+ /**
+ * Validate the object. If succeeded return its typed object.
+ *
+ * @throws SchemaException If validation failed
+ */
public abstract Object validate(Object o);
+ /**
+ * Return the size of the object in bytes
+ */
+ public abstract int sizeOf(Object o);
+
public static final Type INT8 = new Type() {
@Override
public void write(ByteBuffer buffer, Object o) {