You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/02/19 18:37:40 UTC
[GitHub] [gobblin] vikrambohra opened a new pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
vikrambohra opened a new pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231
…chema in schema registry
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
### JIRA
- [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-1391] My Gobblin PR"
- https://issues.apache.org/jira/browse/GOBBLIN-1391
### Description
- [ ] Here are some details about my PR, including screenshots (if applicable):
- Add schema registry to gobblin kafka consumer client.
- filter kafka topics that have no schema in schema registry.
### Tests
- [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
- Tested in linkedin holdem.
### Commits
- [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not "adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] vikrambohra commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
vikrambohra commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579405643
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
+ protected final KafkaSchemaRegistry schemaRegistry;
public AbstractBaseKafkaConsumerClient(Config config) {
this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
}
-
+ this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));
Review comment:
1. There is check for schema registry class inside the KafkaSchemaRegistry.get method
public static <K, S> KafkaSchemaRegistry<K, S> get(Properties props) {
Preconditions.checkArgument(props.containsKey(KAFKA_SCHEMA_REGISTRY_CLASS),
"Missing required property " + KAFKA_SCHEMA_REGISTRY_CLASS);
...
}
2. I am assuming we should only initialize it "once" lazily too right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579403158
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
+ protected final KafkaSchemaRegistry schemaRegistry;
public AbstractBaseKafkaConsumerClient(Config config) {
this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
}
-
+ this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));
Review comment:
Also - can we do a lazy instantiation of KafkaSchemaRegistry inside getFilteredTopics() method? We only need the schema registry check for filtering the list of topics.
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
+ protected final KafkaSchemaRegistry schemaRegistry;
public AbstractBaseKafkaConsumerClient(Config config) {
this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
}
-
+ this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));
Review comment:
What happens when Kafka Schema Registry is not enabled? Shouldn't we check if the config kafka.schema.registry.url is set?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] asfgit closed pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] codecov-io commented on pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#issuecomment-782276721
# [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=h1) Report
> Merging [#3231](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=desc) (8239295) into [master](https://codecov.io/gh/apache/gobblin/commit/f3f8a36559c7118f55c69f70f35294ba4a90a03d?el=desc) (f3f8a36) will **decrease** coverage by `37.33%`.
> The diff coverage is `0.00%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3231/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #3231 +/- ##
============================================
- Coverage 46.37% 9.03% -37.34%
+ Complexity 9931 1737 -8194
============================================
Files 2030 2030
Lines 78718 78728 +10
Branches 8754 8755 +1
============================================
- Hits 36506 7114 -29392
- Misses 38812 70921 +32109
+ Partials 3400 693 -2707
```
| [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../kafka/client/AbstractBaseKafkaConsumerClient.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9BYnN0cmFjdEJhc2VLYWZrYUNvbnN1bWVyQ2xpZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
| [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...va/org/apache/gobblin/converter/EmptyIterable.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9FbXB0eUl0ZXJhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...org/apache/gobblin/ack/BasicAckableForTesting.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYWNrL0Jhc2ljQWNrYWJsZUZvclRlc3RpbmcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...n/java/org/apache/gobblin/salesforce/SfConfig.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2ZDb25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/gobblin/yarn/HelixMessageSubTypes.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...va/org/apache/gobblin/cluster/SingleHelixTask.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU2luZ2xlSGVsaXhUYXNrLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| ... and [1070 more](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=footer). Last update [f3f8a36...8239295](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579457718
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -75,14 +81,30 @@ public AbstractBaseKafkaConsumerClient(Config config) {
@Override
public List<KafkaTopic> getFilteredTopics(final List<Pattern> blacklist, final List<Pattern> whitelist) {
+ final Optional<KafkaSchemaRegistry> schemaRegistry = (config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) && config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL)) ? Optional.of(KafkaSchemaRegistry.get(ConfigUtils.configToProperties(this.config))) : Optional.absent();
Review comment:
Can we cache the schema registry into a member variable and avoid re-initializing it each time it is needed? Also - you can move the check for schema registry existence into a separate method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] sv2000 commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579857621
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -78,11 +85,33 @@ public AbstractBaseKafkaConsumerClient(Config config) {
return Lists.newArrayList(Iterables.filter(getTopics(), new Predicate<KafkaTopic>() {
@Override
public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
- return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, whitelist);
+ return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, whitelist) && isSchemaPresent(kafkaTopic.getName());
}
}));
}
+ private boolean isSchemaRegistryPresent() {
Review comment:
Nit: would recommend renaming the method to isSchemaRegistryConfigured.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] codecov-io edited a comment on pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#issuecomment-782276721
# [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=h1) Report
> Merging [#3231](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=desc) (280d7f9) into [master](https://codecov.io/gh/apache/gobblin/commit/f3f8a36559c7118f55c69f70f35294ba4a90a03d?el=desc) (f3f8a36) will **decrease** coverage by `3.51%`.
> The diff coverage is `0.00%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3231/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #3231 +/- ##
============================================
- Coverage 46.37% 42.85% -3.52%
+ Complexity 9931 9209 -722
============================================
Files 2030 2030
Lines 78718 78730 +12
Branches 8754 8756 +2
============================================
- Hits 36506 33743 -2763
- Misses 38812 41777 +2965
+ Partials 3400 3210 -190
```
| [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../kafka/client/AbstractBaseKafkaConsumerClient.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL2NsaWVudC9BYnN0cmFjdEJhc2VLYWZrYUNvbnN1bWVyQ2xpZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
| [.../org/apache/gobblin/util/filters/HiddenFilter.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsdGVycy9IaWRkZW5GaWx0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...g/apache/gobblin/cluster/HelixMessageSubTypes.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhNZXNzYWdlU3ViVHlwZXMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...gobblin/runtime/mapreduce/GobblinOutputFormat.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWFwcmVkdWNlL0dvYmJsaW5PdXRwdXRGb3JtYXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...obblin/compaction/source/CompactionFailedTask.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vc291cmNlL0NvbXBhY3Rpb25GYWlsZWRUYXNrLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...n/cluster/event/ClusterManagerShutdownRequest.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2x1c3Rlck1hbmFnZXJTaHV0ZG93blJlcXVlc3QuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...in/compaction/action/CompactionCompleteAction.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vYWN0aW9uL0NvbXBhY3Rpb25Db21wbGV0ZUFjdGlvbi5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...n/compaction/mapreduce/orc/OrcKeyDedupReducer.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNLZXlEZWR1cFJlZHVjZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...action/audit/KafkaAuditCountHttpClientFactory.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vYXVkaXQvS2Fma2FBdWRpdENvdW50SHR0cENsaWVudEZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...askStateCollectorServiceHiveRegHandlerFactory.java](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvVGFza1N0YXRlQ29sbGVjdG9yU2VydmljZUhpdmVSZWdIYW5kbGVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| ... and [140 more](https://codecov.io/gh/apache/gobblin/pull/3231/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=footer). Last update [f3f8a36...280d7f9](https://codecov.io/gh/apache/gobblin/pull/3231?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [gobblin] vikrambohra commented on a change in pull request #3231: [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Posted by GitBox <gi...@apache.org>.
vikrambohra commented on a change in pull request #3231:
URL: https://github.com/apache/gobblin/pull/3231#discussion_r579405643
##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
##########
@@ -54,13 +58,14 @@
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
+ protected final KafkaSchemaRegistry schemaRegistry;
public AbstractBaseKafkaConsumerClient(Config config) {
this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
}
-
+ this.schemaRegistry = KafkaSchemaRegistry.get(ConfigUtils.configToProperties(config));
Review comment:
1. There is check for schema registry class inside the KafkaSchemaRegistry.get method
public static <K, S> KafkaSchemaRegistry<K, S> get(Properties props) {
Preconditions.checkArgument(props.containsKey(KAFKA_SCHEMA_REGISTRY_CLASS),
"Missing required property " + KAFKA_SCHEMA_REGISTRY_CLASS);
...
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org