You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/09/11 04:09:20 UTC

[GitHub] [incubator-gobblin] sv2000 opened a new pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

sv2000 opened a new pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1259
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   This PR provides an implementation of Kafka Extractor that uses Gobblin's streaming mode.  The streaming extractor is implemented to periodically generate Flush control messages to allow the pipeline to commit the data to the destination system. For the case of Kafka-> HDFS, the Flush message serves as the logical point to commit files to the destination publish directory, and checkpointing watermarks to the state store. The extractor also tracks various statistics concerning the input source such as the (moving) average of production rate and record sizes, which will be used by the KafkaSource when computing work units. 
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Added unit tests.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r488040221



##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       FlushingExtractor, which is inside gobblin-core-base, is dependent on StateStoreBasedWatermarkStorage which is being moved to gobblin-metastore as part of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r488037752



##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       Fixed. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r501333274



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.avro.Schema;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.gson.JsonElement;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.ComparableWatermark;
+import org.apache.gobblin.source.extractor.Watermark;
+import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.FlushRecordEnvelope;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaProduceRateTracker.KAFKA_PARTITION_PRODUCE_RATE_KEY;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker.NUM_PARTITIONS_ASSIGNED;
+
+/**
+ * An implementation of {@link org.apache.gobblin.source.extractor.Extractor}  which reads from Kafka and returns records .
+ * Type of record depends on deserializer set.
+ */
+@Slf4j
+public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
+  public static final String DATASET_KEY = "dataset";
+  public static final String DATASET_PARTITION_KEY = "datasetPartition";
+  public static final String LIST_DELIMITER_KEY = ",";
+  public static final String RANGE_DELIMITER_KEY = "-";
+  private static final Long MAX_LOG_DECODING_ERRORS = 5L;
+  private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
+      "gobblin.kafka.extractor.statsReportingIntervalMinutes";
+  private static final Long DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES = 1L;
+
+  private final ClassAliasResolver<GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory>
+      kafkaConsumerClientResolver;
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+  private final Map<String, AtomicDouble> consumerMetricsGauges = new ConcurrentHashMap<>();
+  private final KafkaExtractorStatsTracker statsTracker;
+  private final KafkaProduceRateTracker produceRateTracker;
+  private final List<KafkaPartition> partitions;
+  private final long extractorStatsReportingTimeIntervalMillis;
+  //Mapping from Kafka Partition Id to partition index
+  @Getter
+  private final Map<Integer, Integer> partitionIdToIndexMap;
+  private final String recordCreationTimestampFieldName;
+  private final TimeUnit recordCreationTimestampUnit;
+
+  private Iterator<KafkaConsumerRecord> messageIterator = null;
+  private long readStartTime;
+  private long lastExtractorStatsReportingTime;
+  private Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+
+  protected MultiLongWatermark lowWatermark;
+  protected MultiLongWatermark highWatermark;
+  protected MultiLongWatermark nextWatermark;
+  protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+  @Override
+  public void shutdown() {
+    this.shutdownRequested.set(true);
+  }
+
+  @ToString
+  public static class KafkaWatermark implements CheckpointableWatermark {
+
+    KafkaPartition _topicPartition;

Review comment:
       Shall we keep consistent in terms of variable naming convention here?  ( to not using underscore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io commented on pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#issuecomment-723347800


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=h1) Report
   > Merging [#3102](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=desc) (40c4539) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/a260f8f58b67a300f44720cb5a115596ac77ba7e?el=desc) (a260f8f) will **decrease** coverage by `3.98%`.
   > The diff coverage is `51.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3102      +/-   ##
   ============================================
   - Coverage     46.03%   42.05%   -3.99%     
   + Complexity     9591     8858     -733     
   ============================================
     Files          1986     1997      +11     
     Lines         75819    76564     +745     
     Branches       8446     8521      +75     
   ============================================
   - Hits          34907    32196    -2711     
   - Misses        37632    41334    +3702     
   + Partials       3280     3034     -246     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/CheckpointableWatermarkState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9DaGVja3BvaW50YWJsZVdhdGVybWFya1N0YXRlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/stream/FlushRecordEnvelope.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0ZsdXNoUmVjb3JkRW52ZWxvcGUuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...in/source/extractor/extract/FlushingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc291cmNlL2V4dHJhY3Rvci9leHRyYWN0L0ZsdXNoaW5nRXh0cmFjdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/runtime/StateStoreBasedWatermarkStorage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRhc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9TdGF0ZVN0b3JlQmFzZWRXYXRlcm1hcmtTdG9yYWdlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/metrics/MetricContextUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9NZXRyaWNDb250ZXh0VXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/couchbase/writer/CouchbaseWriterBuilder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyQnVpbGRlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...bblin/kafka/client/GobblinKafkaConsumerClient.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9Hb2JibGluS2Fma2FDb25zdW1lckNsaWVudC5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...kunit/packer/KafkaTopicGroupingWorkUnitPacker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS93b3JrdW5pdC9wYWNrZXIvS2Fma2FUb3BpY0dyb3VwaW5nV29ya1VuaXRQYWNrZXIuamF2YQ==) | `40.17% <40.17%> (ø)` | `11.00 <11.00> (?)` | |
   | [...tractor/extract/kafka/KafkaStreamingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVN0cmVhbWluZ0V4dHJhY3Rvci5qYXZh) | `60.48% <60.48%> (ø)` | `20.00 <20.00> (?)` | |
   | [...tractor/extract/kafka/KafkaProduceRateTracker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVByb2R1Y2VSYXRlVHJhY2tlci5qYXZh) | `86.95% <86.95%> (ø)` | `27.00 <27.00> (?)` | |
   | ... and [201 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=footer). Last update [a260f8f...40c4539](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] asfgit closed pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 closed pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
sv2000 closed pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r488037540



##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       Yes. It looks like the default Log level for tests is WARN, and this may be dependent on the specific logger implementation. But I observed flakiness running this test locally, due to the log level sometimes being set to WARN by default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r486785972



##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io edited a comment on pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#issuecomment-723347800


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=h1) Report
   > Merging [#3102](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=desc) (c0a1bf9) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/5db258bb1e7c1d7f274899b9ef7d1e38674bba46?el=desc) (5db258b) will **increase** coverage by `0.27%`.
   > The diff coverage is `51.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3102      +/-   ##
   ============================================
   + Coverage     45.98%   46.25%   +0.27%     
   - Complexity     9598     9708     +110     
   ============================================
     Files          1993     2001       +8     
     Lines         75995    76658     +663     
     Branches       8462     8531      +69     
   ============================================
   + Hits          34945    35461     +516     
   - Misses        37783    37904     +121     
   - Partials       3267     3293      +26     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/CheckpointableWatermarkState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9DaGVja3BvaW50YWJsZVdhdGVybWFya1N0YXRlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/stream/FlushRecordEnvelope.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0ZsdXNoUmVjb3JkRW52ZWxvcGUuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...in/source/extractor/extract/FlushingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc291cmNlL2V4dHJhY3Rvci9leHRyYWN0L0ZsdXNoaW5nRXh0cmFjdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/runtime/StateStoreBasedWatermarkStorage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRhc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9TdGF0ZVN0b3JlQmFzZWRXYXRlcm1hcmtTdG9yYWdlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/metrics/MetricContextUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9NZXRyaWNDb250ZXh0VXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/couchbase/writer/CouchbaseWriterBuilder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyQnVpbGRlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...bblin/kafka/client/GobblinKafkaConsumerClient.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9Hb2JibGluS2Fma2FDb25zdW1lckNsaWVudC5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...kunit/packer/KafkaTopicGroupingWorkUnitPacker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS93b3JrdW5pdC9wYWNrZXIvS2Fma2FUb3BpY0dyb3VwaW5nV29ya1VuaXRQYWNrZXIuamF2YQ==) | `40.17% <40.17%> (ø)` | `11.00 <11.00> (?)` | |
   | [...tractor/extract/kafka/KafkaStreamingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVN0cmVhbWluZ0V4dHJhY3Rvci5qYXZh) | `60.48% <60.48%> (ø)` | `20.00 <20.00> (?)` | |
   | [...tractor/extract/kafka/KafkaProduceRateTracker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVByb2R1Y2VSYXRlVHJhY2tlci5qYXZh) | `86.95% <86.95%> (ø)` | `27.00 <27.00> (?)` | |
   | ... and [30 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=footer). Last update [5db258b...c0a1bf9](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r486785972



##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r486785972



##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?

##########
File path: gobblin-core-base/build.gradle
##########
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")

Review comment:
       What is this module used for ? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
##########
@@ -0,0 +1,23 @@
+package org.apache.gobblin.metrics;

Review comment:
       License header missing here

##########
File path: gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
##########
@@ -40,6 +41,7 @@ public void testKafkaReporter() {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);

Review comment:
       is this needed? 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       is this relevant ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#discussion_r488036758



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
##########
@@ -166,6 +167,8 @@ default long committed(KafkaPartition partition) {
     return -1L;
   }
 
+  default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

Review comment:
       Yes it is. This method is used inside LI's internal Kafka consumer client. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io edited a comment on pull request #3102: GOBBLIN-1259: Implement a Kafka streaming extractor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3102:
URL: https://github.com/apache/incubator-gobblin/pull/3102#issuecomment-723347800


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=h1) Report
   > Merging [#3102](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=desc) (f30996e) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/76658e4bf20ec15ecec6f8947f648a64ad058a92?el=desc) (76658e4) will **decrease** coverage by `0.06%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##             master   #3102      +/-   ##
   ===========================================
   - Coverage      9.21%   9.14%   -0.07%     
   - Complexity     1724    1726       +2     
   ===========================================
     Files          1998    2006       +8     
     Lines         76191   76854     +663     
     Branches       8478    8547      +69     
   ===========================================
   + Hits           7020    7029       +9     
   - Misses        68486   69143     +657     
   + Partials        685     682       -3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../gobblin/runtime/CheckpointableWatermarkState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9DaGVja3BvaW50YWJsZVdhdGVybWFya1N0YXRlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/stream/FlushRecordEnvelope.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0ZsdXNoUmVjb3JkRW52ZWxvcGUuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...in/source/extractor/extract/FlushingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc291cmNlL2V4dHJhY3Rvci9leHRyYWN0L0ZsdXNoaW5nRXh0cmFjdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/runtime/StateStoreBasedWatermarkStorage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRhc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9TdGF0ZVN0b3JlQmFzZWRXYXRlcm1hcmtTdG9yYWdlLmphdmE=) | `0.00% <ø> (ø)` | `0.00 <0.00> (?)` | |
   | [...org/apache/gobblin/metrics/MetricContextUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9NZXRyaWNDb250ZXh0VXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...bblin/couchbase/writer/CouchbaseWriterBuilder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY291Y2hiYXNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvdWNoYmFzZS93cml0ZXIvQ291Y2hiYXNlV3JpdGVyQnVpbGRlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...bblin/kafka/client/GobblinKafkaConsumerClient.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9Hb2JibGluS2Fma2FDb25zdW1lckNsaWVudC5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...source/extractor/extract/kafka/KafkaPartition.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVBhcnRpdGlvbi5qYXZh) | `32.14% <0.00%> (ø)` | `1.00 <0.00> (ø)` | |
   | [...tractor/extract/kafka/KafkaProduceRateTracker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVByb2R1Y2VSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...tractor/extract/kafka/KafkaStreamingExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9rYWZrYS9LYWZrYVN0cmVhbWluZ0V4dHJhY3Rvci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | ... and [14 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3102/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=footer). Last update [76658e4...f30996e](https://codecov.io/gh/apache/incubator-gobblin/pull/3102?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org