You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/06 16:58:56 UTC

[GitHub] [pulsar] aymkhalil opened a new pull request, #17902: [improve][fn] Run connectors extraction in parallel

aymkhalil opened a new pull request, #17902:
URL: https://github.com/apache/pulsar/pull/17902

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   ### Motivation
   
   Improve functions worker bootstrap time with increased number of connectors. Currently all nar files are unpacked during startup to perform validation steps (e.g. if the connector class implements the correct interfaces). For that, the NarClassLoader will unpack all nars to disk sequentially which could delay startup.
   
   On my Machine (Mac M1) I observed an improvement of ~ 3X (from 48 seconds down to 15 seconds). I know the max number of chosen threads (4) is a magic number (I have not seen improvements beyond that number), but if this is the right approach and it is worth it, we can make this number configurable (need guidance on the right config place)
   
   Sample output:
   ```
   2022-09-30T01:22:13,778+0000 [main] INFO  org.apache.pulsar.functions.utils.io.ConnectorUtils - Searching for connectors in /pulsar/./connectors
   2022-09-30T01:22:13,779+0000 [main] INFO  org.apache.pulsar.functions.utils.io.ConnectorUtils - Loading 58 connector definitions with a thread pool of size 4
   2022-09-30T01:22:13,781+0000 [search-connectors-executor-0] INFO  org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-debezium-mssql-2.10.1.8-SNAPSHOT.nar-unpacked
   2022-09-30T01:22:13,781+0000 [search-connectors-executor-1] INFO  org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-snowflake-connector-0.1.10.nar-unpacked
   2022-09-30T01:22:13,781+0000 [search-connectors-executor-2] INFO  org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar-unpacked
   2022-09-30T01:22:13,784+0000 [search-connectors-executor-3] INFO  org.apache.pulsar.common.nar.NarUnpacker - Created directory /tmp/pulsar-nar/pulsar-io-kinesis-2.10.1.8-SNAPSHOT.nar-unpacked
   2022-09-30T01:22:13,895+0000 [search-connectors-executor-2] INFO  org.apache.pulsar.common.nar.NarUnpacker - Extracting /pulsar/./connectors/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar to /tmp/pulsar-nar/pulsar-io-mongo-2.10.1.8-SNAPSHOT.nar-unpacked/gs7wF7XmmgT1MjvpwSxFfA
   ```
   
   ### Modifications
   
   Just run the connectors search function (which unpacks the nars) in parallel via a CachedThreadPool.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is already covered by existing tests, such as the unit tests under PulsarFunctionLocalRunTest already covers this path. Also CI for each existing connector will cover the change
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: (https://github.com/aymkhalil/pulsar/pull/2)
   
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984775978


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();
+        }
+
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(4, archives.size());

Review Comment:
   make sense, I read few warnings about the behavior of it in containerized environments, but thinking again it is more reasonable than hardcoding! (also java uses it to init things like `newWorkStealingPool`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985128703


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+            oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+                    new ThreadFactoryBuilder().setNameFormat("search-connectors-executor-%d").build());

Review Comment:
   You are right, it is more about extraction or unpacking. The function name is `searchForConnectors` but the parallel work is the extraction. Updating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984765755


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();

Review Comment:
   Collections.emptyMap() doesn't work with TreeMap. Please note this code is invoked at startup only if the concern is repetitive initialization. Interestingly, I couldn't find a java.util or guava API to create an immutable TreeMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1280270754

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17902](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6f99bb6) into [master](https://codecov.io/gh/apache/pulsar/commit/6c65ca0d8a80bfaaa4d5869e0cea485f5c94369b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c65ca0) will **decrease** coverage by `0.02%`.
   > The diff coverage is `33.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17902/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #17902      +/-   ##
   ============================================
   - Coverage     34.91%   34.89%   -0.03%     
   + Complexity     5707     4462    -1245     
   ============================================
     Files           607      393     -214     
     Lines         53396    43422    -9974     
     Branches       5712     4462    -1250     
   ============================================
   - Hits          18644    15152    -3492     
   + Misses        32119    26161    -5958     
   + Partials       2633     2109     -524     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `34.89% <33.33%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/17902?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...broker/delayed/InMemoryDelayedDeliveryTracker.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL0luTWVtb3J5RGVsYXllZERlbGl2ZXJ5VHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../service/SystemTopicBasedTopicPoliciesService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | `49.68% <0.00%> (-1.91%)` | :arrow_down: |
   | [...g/apache/pulsar/compaction/CompactedTopicImpl.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vQ29tcGFjdGVkVG9waWNJbXBsLmphdmE=) | `68.57% <0.00%> (+57.85%)` | :arrow_up: |
   | [...apache/pulsar/proxy/server/DirectProxyHandler.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL0RpcmVjdFByb3h5SGFuZGxlci5qYXZh) | `63.63% <50.00%> (ø)` | |
   | [...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=) | `12.24% <100.00%> (+0.84%)` | :arrow_up: |
   | [...rg/apache/pulsar/broker/service/BrokerService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Jyb2tlclNlcnZpY2UuamF2YQ==) | `50.00% <100.00%> (+1.99%)` | :arrow_up: |
   | [...ion/pendingack/impl/MLPendingAckStoreProvider.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvTUxQZW5kaW5nQWNrU3RvcmVQcm92aWRlci5qYXZh) | `19.27% <0.00%> (-48.20%)` | :arrow_down: |
   | [...tent/NonPersistentDispatcherMultipleConsumers.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudERpc3BhdGNoZXJNdWx0aXBsZUNvbnN1bWVycy5qYXZh) | `0.00% <0.00%> (-48.15%)` | :arrow_down: |
   | [...apache/pulsar/broker/service/TopicListService.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1RvcGljTGlzdFNlcnZpY2UuamF2YQ==) | `10.65% <0.00%> (-44.27%)` | :arrow_down: |
   | [...transaction/pendingack/impl/MLPendingAckStore.java](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvTUxQZW5kaW5nQWNrU3RvcmUuamF2YQ==) | `1.63% <0.00%> (-35.66%)` | :arrow_down: |
   | ... and [336 more](https://codecov.io/gh/apache/pulsar/pull/17902/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli closed pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #17902: [improve][fn] Run connectors extraction in parallel
URL: https://github.com/apache/pulsar/pull/17902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984779306


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();

Review Comment:
   same comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984276796


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();

Review Comment:
   Collections.emptyMap() ? 



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();

Review Comment:
   Collections.emptyMap() ?



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();
+        }
+
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(4, archives.size());

Review Comment:
   what about using 
   ```
   Runtime.getRuntime().availableProcessors()
   ```
   ? 
   
   I agree we don't need this to be configurable 



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");

Review Comment:
   not necessary, it's ok to have an empty connectors dir 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985243883


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+            oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+                    new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
+            List<CompletableFuture<Map.Entry<String, Connector>>> futures = new ArrayList<>();
+            for (Path archive : archives) {
+                CompletableFuture future = CompletableFuture.supplyAsync(() ->

Review Comment:
   Nit: we are missing the generic type here



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   We can log this at debug level. It is not useful.
   Please add the if log.isDebugEnabled guard, as this is the code style in Pulsar for DEBUG statements



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985975578


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (e.g. the run on single CPU but have 100 connectors)



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but have single CPU...)
   
   



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but only single CPU...)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1265724498

   Side note for committer (@eolivelli @nicoloboschi) : Please update the commit message to match the PR title when merging,. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #17902:
URL: https://github.com/apache/pulsar/pull/17902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984766174


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");

Review Comment:
   removing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985112951


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
+            oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
+                    new ThreadFactoryBuilder().setNameFormat("search-connectors-executor-%d").build());

Review Comment:
   Isn't this completely generic? I'm just wondering about the "search" part in the naming.
   ```suggestion
                       new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1264400162

   Oh, it seems that I misunderstood the naming.
   > `... INFO  org.apache.pulsar.functions.utils.io.ConnectorUtils - Searching for connectors in /pulsar/./connectors`
   
   "connector extraction" etc. would make more sense to me.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17902: [improve][fn] Run search connectors in parallel

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1264399685

   "Run search connectors in parallel"
   
   Isn't this completely generic?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #17902: [improve][fn] Run connectors extraction in parallel

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#issuecomment-1270411800

   Opened closed to trigger CI.
   Something is blockes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org