You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/26 20:05:01 UTC

kafka git commit: KAFKA-2478: Fix manual committing example in javadoc

Repository: kafka
Updated Branches:
  refs/heads/trunk 5ae97196a -> 82c219149


KAFKA-2478: Fix manual committing example in javadoc

Committing before inserting all records into the database
might lead to some records being lost.

I've changed the example to commit only after all records
returned by `poll` are inserted into the database.

Author: Dmitry Stratiychuk <ds...@yammer-inc.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #210 from shtratos/KAFKA-2478


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82c21914
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82c21914
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82c21914

Branch: refs/heads/trunk
Commit: 82c219149027e8d96840af98d32fb1b877ab4ec2
Parents: 5ae9719
Author: Dmitry Stratiychuk <ds...@yammer-inc.com>
Authored: Tue Jan 26 11:04:58 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jan 26 11:04:58 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java     | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82c21914/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2f78139..afe3240 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -145,7 +145,7 @@ import java.util.regex.Pattern;
  *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
  *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
  *     while (true) {
  *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
@@ -200,19 +200,19 @@ import java.util.regex.Pattern;
  *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
  *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
- *     int commitInterval = 200;
- *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String, String&gt;&gt;();
+ *     final int minBatchSize = 200;
+ *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;&gt;();
  *     while (true) {
  *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
  *         for (ConsumerRecord&lt;String, String&gt; record : records) {
  *             buffer.add(record);
- *             if (buffer.size() &gt;= commitInterval) {
- *                 insertIntoDb(buffer);
- *                 consumer.commitSync();
- *                 buffer.clear();
- *             }
+ *         }
+ *         if (buffer.size() &gt;= minBatchSize) {
+ *             insertIntoDb(buffer);
+ *             consumer.commitSync();
+ *             buffer.clear();
  *         }
  *     }
  * </pre>