You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/27 22:26:50 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1094] Added documentation of High level consumer

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f7800d3  [GOBBLIN-1094] Added documentation of High level consumer
f7800d3 is described below

commit f7800d34ca08aad1c09dbfdbef376a90f6a1cc27
Author: vbohra <vb...@linkedin.com>
AuthorDate: Fri Mar 27 15:26:42 2020 -0700

    [GOBBLIN-1094] Added documentation of High level consumer
    
    Closes #2935 from vikrambohra/GOBBLIN-1094
---
 gobblin-docs/developer-guide/HighLevelConsumer.md | 67 +++++++++++++++++++++++
 mkdocs.yml                                        |  1 +
 2 files changed, 68 insertions(+)

diff --git a/gobblin-docs/developer-guide/HighLevelConsumer.md b/gobblin-docs/developer-guide/HighLevelConsumer.md
new file mode 100644
index 0000000..3f229b7
--- /dev/null
+++ b/gobblin-docs/developer-guide/HighLevelConsumer.md
@@ -0,0 +1,67 @@
+
+
+Problem Statement
+=================
+
+Current Gobblin Kafka [`High Level Consumer`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java) uses Kafka Consumer (0.8) APIs and gobblin support for them will be deprecated. The Re-design's primary goal is to replace old kafka consumer APIs like [`ConsumerConnector`](https://archive.apache.org/dist/kafka/0.8.2.2/scaladoc/index.html#kafka.consumer.ConsumerConnector) and [`MessageAndMetadata`](ht [...]
+Additionally, the old design uses kafka auto commit feature which can cause potential loss of messages when offsets are committed and the system fails before messages are processed.
+
+New Design & Details 
+====================
+
+GobblinKafkaConsumerClient
+
+The new design uses [`GobblinKafkaConsumerClient`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java) which is a simplified, generic wrapper client to communicate with Kafka. This class does not depend on classes defined in kafka-clients library. This allows the high level consumer to work with different versions of kafka. Concrete classes implementing this interface u [...]
+
+
+Manual Offset Commit
+
+`GobblinKafkaConsumerClient` API has been enhanced to allow manual committing of offsets.
+
+``` java 
+  /**
+   * Commit offsets manually to Kafka asynchronously
+   */
+  default void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+    return;
+  }
+
+  /**
+   * Commit offsets manually to Kafka synchronously
+   */
+  default void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+    return;
+  }
+
+  /**
+   * returns the last committed offset for a KafkaPartition
+   * @param partition
+   * @return last committed offset or -1 for invalid KafkaPartition
+   */
+  default long committed(KafkaPartition partition) {
+    return -1L;
+  }
+```
+
+High level consumer records topic partitions and their offsets AFTER the messages are processed and commits them periodically to kafka. This ensures at-least once delivery in case of a failure.
+
+Additionally, APIs are provided to subscribe to a topic along with a [`GobblinKafkaRebalanceListener`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java) that provides hooks to when a consumer joins/leaves a consumer group.
+In this case, we commit remaining offsets and clear offset caches.
+
+``` java 
+  /**
+   * Subscribe to a topic
+   * @param topic
+   */
+  default void subscribe(String topic) {
+    return;
+  }
+
+  /**
+   * Subscribe to a topic along with a GobblinKafkaRebalanceListener
+   * @param topic
+   */
+  default void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+    return;
+  }
+```
\ No newline at end of file
diff --git a/mkdocs.yml b/mkdocs.yml
index 4a0442c..84d3f75 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -99,6 +99,7 @@ pages:
         - Documentation Architecture: developer-guide/Documentation-Architecture.md
         - Contributing: developer-guide/Contributing.md
         - Gobblin Modules: developer-guide/GobblinModules.md
+        - High Level Consumer: developer-guide/HighLevelConsumer.md
     - Project:
         - Feature List: project/Feature-List.md
         - Contributors and Team: project/Team.md