You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/02/19 18:37:40 UTC

[GitHub] [gobblin] vikrambohra opened a new pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

vikrambohra opened a new pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231


   …chema in schema registry
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-1391] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1391
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   - Add schema registry to gobblin kafka consumer client.
   - filter kafka topics that have no schema in schema registry.
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   - Tested in linkedin holdem.
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] vikrambohra commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
vikrambohra commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579405643



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
   protected final int fetchTimeoutMillis;
   protected final int fetchMinBytes;
   protected final int socketTimeoutMillis;
+  protected final KafkaSchemaRegistry schemaRegistry;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
     this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
     if (this.brokers.isEmpty()) {
       throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
     }
-
+    this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));

Review comment:
       1. There is check for schema registry class inside the KafkaSchemaRegistry.get method
     public static <K, S> KafkaSchemaRegistry<K, S> get(Properties props) {
       Preconditions.checkArgument(props.containsKey(KAFKA_SCHEMA_REGISTRY_CLASS),
           "Missing required property " + KAFKA_SCHEMA_REGISTRY_CLASS);
   ...
   }
   2. I am assuming we should only initialize it "once" lazily too right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579403158



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
   protected final int fetchTimeoutMillis;
   protected final int fetchMinBytes;
   protected final int socketTimeoutMillis;
+  protected final KafkaSchemaRegistry schemaRegistry;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
     this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
     if (this.brokers.isEmpty()) {
       throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
     }
-
+    this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));

Review comment:
       Also - can we do a lazy instantiation of KafkaSchemaRegistry inside getFilteredTopics() method? We only need the schema registry check for filtering the list of topics. 

##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
   protected final int fetchTimeoutMillis;
   protected final int fetchMinBytes;
   protected final int socketTimeoutMillis;
+  protected final KafkaSchemaRegistry schemaRegistry;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
     this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
     if (this.brokers.isEmpty()) {
       throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
     }
-
+    this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));

Review comment:
       What happens when Kafka Schema Registry is not enabled? Shouldn't we check if the config kafka.schema.registry.url is set?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] asfgit closed pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-io commented on pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#issuecomment-782276721


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=h1) Report
   > Merging [#3231](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=desc) (8239295) into [master](https://codecov.io/gh/apache/gobblin/commit/f3f8a36559c7118f55c69f70f35294ba4a90a03d?el=desc) (f3f8a36) will **decrease** coverage by `37.33%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3231/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3231       +/-   ##
   ============================================
   - Coverage     46.37%   9.03%   -37.34%     
   + Complexity     9931    1737     -8194     
   ============================================
     Files          2030    2030               
     Lines         78718   78728       +10     
     Branches       8754    8755        +1     
   ============================================
   - Hits          36506    7114    -29392     
   - Misses        38812   70921    +32109     
   + Partials       3400     693     -2707     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../kafka/client/AbstractBaseKafkaConsumerClient.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9BYnN0cmFjdEJhc2VLYWZrYUNvbnN1bWVyQ2xpZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...va/org/apache/gobblin/converter/EmptyIterable.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9FbXB0eUl0ZXJhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...org/apache/gobblin/ack/BasicAckableForTesting.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYWNrL0Jhc2ljQWNrYWJsZUZvclRlc3RpbmcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...n/java/org/apache/gobblin/salesforce/SfConfig.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2ZDb25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/gobblin/yarn/HelixMessageSubTypes.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/cluster/SingleHelixTask.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU2luZ2xlSGVsaXhUYXNrLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | ... and [1070 more](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=footer). Last update [f3f8a36...8239295](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579457718



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -75,14 +81,30 @@ public AbstractBaseKafkaConsumerClient(Config config) {
 
   @Override
   public List<KafkaTopic> getFilteredTopics(final List<Pattern> blacklist, final List<Pattern> whitelist) {
+    final Optional<KafkaSchemaRegistry> schemaRegistry = (config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) && config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL)) ? Optional.of(KafkaSchemaRegistry.get(ConfigUtils.configToProperties(this.config))) : Optional.absent();

Review comment:
       Can we cache the schema registry into a member variable and avoid re-initializing it each time it is needed? Also - you can move the check for schema registry existence into a separate method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579857621



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -78,11 +85,33 @@ public AbstractBaseKafkaConsumerClient(Config config) {
     return Lists.newArrayList(Iterables.filter(getTopics(), new Predicate<KafkaTopic>() {
       @Override
       public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
-        return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, whitelist);
+        return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, whitelist) && isSchemaPresent(kafkaTopic.getName());
       }
     }));
   }
 
+  private boolean isSchemaRegistryPresent() {

Review comment:
       Nit: would recommend renaming the method to isSchemaRegistryConfigured.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-io edited a comment on pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#issuecomment-782276721


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=h1) Report
   > Merging [#3231](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=desc) (280d7f9) into [master](https://codecov.io/gh/apache/gobblin/commit/f3f8a36559c7118f55c69f70f35294ba4a90a03d?el=desc) (f3f8a36) will **decrease** coverage by `3.51%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3231/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3231      +/-   ##
   ============================================
   - Coverage     46.37%   42.85%   -3.52%     
   + Complexity     9931     9209     -722     
   ============================================
     Files          2030     2030              
     Lines         78718    78730      +12     
     Branches       8754     8756       +2     
   ============================================
   - Hits          36506    33743    -2763     
   - Misses        38812    41777    +2965     
   + Partials       3400     3210     -190     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../kafka/client/AbstractBaseKafkaConsumerClient.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9BYnN0cmFjdEJhc2VLYWZrYUNvbnN1bWVyQ2xpZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../org/apache/gobblin/util/filters/HiddenFilter.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsdGVycy9IaWRkZW5GaWx0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...g/apache/gobblin/cluster/HelixMessageSubTypes.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...gobblin/runtime/mapreduce/GobblinOutputFormat.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWFwcmVkdWNlL0dvYmJsaW5PdXRwdXRGb3JtYXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...obblin/compaction/source/CompactionFailedTask.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vc291cmNlL0NvbXBhY3Rpb25GYWlsZWRUYXNrLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...n/cluster/event/ClusterManagerShutdownRequest.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2x1c3Rlck1hbmFnZXJTaHV0ZG93blJlcXVlc3QuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...in/compaction/action/CompactionCompleteAction.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vYWN0aW9uL0NvbXBhY3Rpb25Db21wbGV0ZUFjdGlvbi5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...n/compaction/mapreduce/orc/OrcKeyDedupReducer.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNLZXlEZWR1cFJlZHVjZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...action/audit/KafkaAuditCountHttpClientFactory.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vYXVkaXQvS2Fma2FBdWRpdENvdW50SHR0cENsaWVudEZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...askStateCollectorServiceHiveRegHandlerFactory.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvVGFza1N0YXRlQ29sbGVjdG9yU2VydmljZUhpdmVSZWdIYW5kbGVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | ... and [140 more](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=footer). Last update [f3f8a36...280d7f9](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] vikrambohra commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…

Posted by GitBox <gi...@apache.org>.
vikrambohra commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579405643



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
   protected final int fetchTimeoutMillis;
   protected final int fetchMinBytes;
   protected final int socketTimeoutMillis;
+  protected final KafkaSchemaRegistry schemaRegistry;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
     this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
     if (this.brokers.isEmpty()) {
       throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
     }
-
+    this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));

Review comment:
       1. There is check for schema registry class inside the KafkaSchemaRegistry.get method
     public static <K, S> KafkaSchemaRegistry<K, S> get(Properties props) {
       Preconditions.checkArgument(props.containsKey(KAFKA_SCHEMA_REGISTRY_CLASS),
           "Missing required property " + KAFKA_SCHEMA_REGISTRY_CLASS);
   ...
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org