You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/06 16:58:56 UTC
[GitHub] [pulsar] aymkhalil opened a new pull request, #17902: [improve][fn] Run connectors extraction in parallel
aymkhalil opened a new pull request, #17902:
URL: https://github.com/apache/pulsar/pull/17902
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
- Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->
### Motivation
Improve functions worker bootstrap time with increased number of connectors. Currently all nar files are unpacked during startup to perform validation steps (e.g. if the connector class implements the correct interfaces). For that, the NarClassLoader will unpack all nars to disk sequentially which could delay startup.
On my Machine (Mac M1) I observed an improvement of ~ 3X (from 48 seconds down to 15 seconds). I know the max number of chosen threads (4) is a magic number (I have not seen improvements beyond that number), but if this is the right approach and it is worth it, we can make this number configurable (need guidance on the right config place)
Sample output:
```
2022-09-30T01:22:13,778+0000 [main] INFO org.apache.pulsar.functions.utils.io.ConnectorUtils - Searching for connectors in /pulsar/./connectors
2022-09-30T01:22:13,779+0000 [main] INFO org.apache.pulsar.functions.utils.io.ConnectorUtils - Loading 58 connector definitions with a thread pool of size 4
2022-09-30T01:22:13,781+0000 [search-connectors-executor-0] INFO org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-debezium-mssql-2.10.1.8-SNAPSHOT.nar-unpacked
2022-09-30T01:22:13,781+0000 [search-connectors-executor-1] INFO org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-snowflake-connector-0.1.10.nar-unpacked
2022-09-30T01:22:13,781+0000 [search-connectors-executor-2] INFO org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar-unpacked
2022-09-30T01:22:13,784+0000 [search-connectors-executor-3] INFO org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-kinesis-2.10.1.8-SNAPSHOT.nar-unpacked
2022-09-30T01:22:13,895+0000 [search-connectors-executor-2] INFO org.apache.pulsar.common.nar.NarUnpacker - Extracting /pulsar/./connectors/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar to /tmp/pulsar-nar/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar-unpacked/gs7wF7XmmgT1MjvpwSxFfA
```
### Modifications
Just run the connectors search function (which unpacks the nars) in parallel via a CachedThreadPool.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is already covered by existing tests, such as the unit tests under PulsarFunctionLocalRunTest already covers this path. Also CI for each existing connector will cover the change
### Does this pull request potentially affect one of the following parts:
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
### Matching PR in forked repository
PR in forked repository: (https://github.com/aymkhalil/pulsar/pull/2)
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since the
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in
a forked repository.
The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984775978
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
+ return new TreeMap<>();
+ }
+
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(4, archives.size());
Review Comment:
make sense, I read few warnings about the behavior of it in containerized environments, but thinking again it is more reasonable than hardcoding! (also java uses it to init things like `newWorkStealingPool`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985128703
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+ oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+ new ThreadFactoryBuilder().setNameFormat("search-connectors-executor-%d").build());
Review Comment:
You are right, it is more about extraction or unpacking. The function name is `searchForConnectors` but the parallel work is the extraction. Updating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984765755
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
Review Comment:
Collections.emptyMap() doesn't work with TreeMap. Please note this code is invoked at startup only if the concern is repetitive initialization. Interestingly, I couldn't find a java.util or guava API to create an immutable TreeMap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1280270754
# [Codecov](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#17902](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6f99bb6) into [master](https://codecov.io/gh/apache/pulsar/commit/6c65ca0d8a80bfaaa4d5869e0cea485f5c94369b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c65ca0) will **decrease** coverage by `0.02%`.
> The diff coverage is `33.33%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17902/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #17902 +/- ##
============================================
- Coverage 34.91% 34.89% -0.03%
+ Complexity 5707 4462 -1245
============================================
Files 607 393 -214
Lines 53396 43422 -9974
Branches 5712 4462 -1250
============================================
- Hits 18644 15152 -3492
+ Misses 32119 26161 -5958
+ Partials 2633 2109 -524
```
| Flag | Coverage Δ | |
|---|---|---|
| unittests | `34.89% <33.33%> (-0.03%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...broker/delayed/InMemoryDelayedDeliveryTracker.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL0luTWVtb3J5RGVsYXllZERlbGl2ZXJ5VHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
| [.../service/SystemTopicBasedTopicPoliciesService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | `49.68% <0.00%> (-1.91%)` | :arrow_down: |
| [...g/apache/pulsar/compaction/CompactedTopicImpl.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vQ29tcGFjdGVkVG9waWNJbXBsLmphdmE=) | `68.57% <0.00%> (+57.85%)` | :arrow_up: |
| [...apache/pulsar/proxy/server/DirectProxyHandler.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL0RpcmVjdFByb3h5SGFuZGxlci5qYXZh) | `63.63% <50.00%> (ø)` | |
| [...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=) | `12.24% <100.00%> (+0.84%)` | :arrow_up: |
| [...rg/apache/pulsar/broker/service/BrokerService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Jyb2tlclNlcnZpY2UuamF2YQ==) | `50.00% <100.00%> (+1.99%)` | :arrow_up: |
| [...ion/pendingack/impl/MLPendingAckStoreProvider.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvTUxQZW5kaW5nQWNrU3RvcmVQcm92aWRlci5qYXZh) | `19.27% <0.00%> (-48.20%)` | :arrow_down: |
| [...tent/NonPersistentDispatcherMultipleConsumers.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudERpc3BhdGNoZXJNdWx0aXBsZUNvbnN1bWVycy5qYXZh) | `0.00% <0.00%> (-48.15%)` | :arrow_down: |
| [...apache/pulsar/broker/service/TopicListService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1RvcGljTGlzdFNlcnZpY2UuamF2YQ==) | `10.65% <0.00%> (-44.27%)` | :arrow_down: |
| [...transaction/pendingack/impl/MLPendingAckStore.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvTUxQZW5kaW5nQWNrU3RvcmUuamF2YQ==) | `1.63% <0.00%> (-35.66%)` | :arrow_down: |
| ... and [336 more](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli closed pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #17902: [improve][fn] Run connectors extraction in parallel
URL: https://github.com/apache/pulsar/pull/17902
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984779306
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
+ return new TreeMap<>();
Review Comment:
same comment above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984276796
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
Review Comment:
Collections.emptyMap() ?
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
+ return new TreeMap<>();
Review Comment:
Collections.emptyMap() ?
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
+ return new TreeMap<>();
+ }
+
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(4, archives.size());
Review Comment:
what about using
```
Runtime.getRuntime().availableProcessors()
```
?
I agree we don't need this to be configurable
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
Review Comment:
not necessary, it's ok to have an empty connectors dir
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli commented on a diff in pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985243883
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+ oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+ new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
+ List<CompletableFuture<Map.Entry<String, Connector>>> futures = new ArrayList<>();
+ for (Path archive : archives) {
+ CompletableFuture future = CompletableFuture.supplyAsync(() ->
Review Comment:
Nit: we are missing the generic type here
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
Review Comment:
We can log this at debug level. It is not useful.
Please add the if log.isDebugEnabled guard, as this is the code style in Pulsar for DEBUG statements
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985975578
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
Review Comment:
I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (e.g. the run on single CPU but have 100 connectors)
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
Review Comment:
I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but have single CPU...)
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
Review Comment:
I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but only single CPU...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1265724498
Side note for committer (@eolivelli @nicoloboschi) : Please update the commit message to match the PR title when merging,.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- merged pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #17902:
URL: https://github.com/apache/pulsar/pull/17902
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984766174
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ if (archives.isEmpty()) {
+ log.warn("Connectors archive directory is empty");
Review Comment:
removing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] lhotari commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985112951
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
- TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return connectors;
+ return new TreeMap<>();
}
+ List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
+ archives.add(archive);
+ }
+ }
+ if (archives.isEmpty()) {
+ return new TreeMap<>();
+ }
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
+ ExecutorService oneTimeExecutor = null;
+ try {
+ int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+ log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+ oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+ new ThreadFactoryBuilder().setNameFormat("search-connectors-executor-%d").build());
Review Comment:
Isn't this completely generic? I'm just wondering about the "search" part in the naming.
```suggestion
new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] lhotari commented on pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1264400162
Oh, it seems that I misunderstood the naming.
> `... INFO org.apache.pulsar.functions.utils.io.ConnectorUtils - Searching for connectors in /pulsar/./connectors`
"connector extraction" etc. would make more sense to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] lhotari commented on pull request #17902: [improve][fn] Run search connectors in parallel
Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1264399685
"Run search connectors in parallel"
Isn't this completely generic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli commented on pull request #17902: [improve][fn] Run connectors extraction in parallel
Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1270411800
Opened closed to trigger CI.
Something is blockes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org