You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/30 07:13:59 UTC

[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17902: [improve][fn] Run search connectors in parallel

nicoloboschi commented on code in PR #17902:
URL: https://github.com/apache/pulsar/pull/17902#discussion_r984276796


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();

Review Comment:
   Collections.emptyMap() ? 



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();

Review Comment:
   Collections.emptyMap() ?



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");
+            return new TreeMap<>();
+        }
+
+        ExecutorService oneTimeExecutor = null;
+        try {
+            int nThreads = Math.min(4, archives.size());

Review Comment:
   what about using 
   ```
   Runtime.getRuntime().availableProcessors()
   ```
   ? 
   
   I agree we don't need this to be configurable 



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -142,49 +149,83 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
 
-        TreeMap<String, Connector> connectors = new TreeMap<>();
         if (!path.toFile().exists()) {
             log.warn("Connectors archive directory not found");
-            return connectors;
+            return new TreeMap<>();
         }
 
+        List<Path> archives = new ArrayList<>();
         try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
             for (Path archive : stream) {
-                try {
-
-                    NarClassLoader ncl = NarClassLoaderBuilder.builder()
-                            .narFile(new File(archive.toString()))
-                            .extractionDirectory(narExtractionDirectory)
-                            .build();
-
-                    Connector.ConnectorBuilder connectorBuilder = Connector.builder();
-                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
-                    log.info("Found connector {} from {}", cntDef, archive);
-
-                    connectorBuilder.archivePath(archive);
-                    if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
-                            connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
-                                    .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
-                        }
-                    }
+                archives.add(archive);
+            }
+        }
 
-                    if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
-                            connectorBuilder.sinkConfigFieldDefinitions(
-                                    ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
-                        }
-                    }
+        if (archives.isEmpty()) {
+            log.warn("Connectors archive directory is empty");

Review Comment:
   not necessary, it's ok to have an empty connectors dir 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org