You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:07:41 UTC

[GitHub] [pulsar] aymkhalil commented on a diff in pull request #17902: [improve][fn] Run connectors extraction in parallel

aymkhalil commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r985975578


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (e.g. the run on single CPU but have 100 connectors)



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but have single CPU...)
   
   



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
+        if (archives.isEmpty()) {
+            return new TreeMap<>();
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
+            log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);

Review Comment:
   I actually think the log is useful for troubleshooting no? Both # of connectors and # of CPU are variables, and if someone observes a long bootstrap time, we could quickly support them by ruling out the obvious (for example, they have 100 connectors but only single CPU...)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org