You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/21 11:19:10 UTC

[GitHub] [pulsar] gaozhangmin opened a new pull request #14346: [Pulsar-Functions] Use ComputeIfAbsent instead in FunctionCacheManagerImpl

gaozhangmin opened a new pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346


   
   
   ### Motivation
   `cacheFunctions` is  ConcurrentHashMap, no need to use synchronized block.
   
   
   ### Modifications
   
   Remove synchronized block.
   
   
   
   ### Documentation
   
   Check the box below or label this PR directly (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809655470



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -116,43 +112,35 @@ public void registerFunctionInstanceWithArchive(String fid, String eid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
 
-            if (null != entry) {
-                entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
+        if (null != entry) {
+            entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
+            return;
+        }
 
-            // Create new cache entry
-            try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
-            } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
-            }
+        // Create new cache entry
+        try {
+            cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }

Review comment:
       We should also use `computeIfAbsent` here.

##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -73,39 +71,37 @@ public void registerFunctionInstance(String fid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
 
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
+        if (null == entry) {
+            URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+            int count = 0;
+            try {
+                // add jar files to urls
+                for (String jarFile : requiredJarFiles) {
+                    urls[count++] = new File(jarFile).toURI().toURL();
+                }
 
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
+                // add classpaths
+                for (URL url : requiredClasspaths) {
+                    urls[count++] = url;
+                }
 
-                    cacheFunctions.put(
+                cacheFunctions.put(
                         fid,
                         new FunctionCacheEntry(
-                            requiredJarFiles,
-                            requiredClasspaths,
-                            urls,
-                            eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
-                }

Review comment:
       We should use `computeIfAbsent` here. Because in the `if` branch, the composed `get` and `put` operations are not atomic.
   
   We can use a `AtomicBoolean` to check if a new value has been put, like:
   
   ```java
           final Map<String, String> map = new ConcurrentHashMap<>();
           final AtomicBoolean flag = new AtomicBoolean(false);
           final String value = map.computeIfAbsent("A", __ -> {
               flag.set(true);
               return "B";
           });
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1044196977


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809860614



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -72,87 +70,70 @@ public void registerFunctionInstance(String fid,
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
+        URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+        int count = 0;
+        try {
+            // add jar files to urls
+            for (String jarFile : requiredJarFiles) {
+                urls[count++] = new File(jarFile).toURI().toURL();
+            }
+
+            // add classpaths
+            for (URL url : requiredClasspaths) {
+                urls[count++] = url;
+            }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
-
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
-
-                    cacheFunctions.put(
-                        fid,
-                        new FunctionCacheEntry(
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(
+                    fid,
+                    new FunctionCacheEntry(
                             requiredJarFiles,
                             requiredClasspaths,
                             urls,
                             eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
-                }
-            } else {
+            if (entry != null) {
                 entry.register(
-                    eid,
-                    requiredJarFiles,
-                    requiredClasspaths);
+                        eid,
+                        requiredJarFiles,
+                        requiredClasspaths);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
     public void registerFunctionInstanceWithArchive(String fid, String eid,
-                                                    String narArchive, String narExtractionDirectory) throws IOException {
+                                                    String narArchive,
+                                                    String narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
-
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
+        // Create new cache entry.
+        try {
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(fid, new FunctionCacheEntry(narArchive, eid,
+                    rootClassLoader, narExtractionDirectory));
             if (null != entry) {
                 entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
-
-            // Create new cache entry
-            try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
-            } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
-    public void unregisterFunctionInstance(String fid,
-                                           String eid) {
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null != entry) {
-                if (entry.unregister(eid)) {
-                    cacheFunctions.remove(fid);
-                    entry.close();
-                }
+    public void unregisterFunctionInstance(String fid, String eid) {
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
+        if (null != entry) {
+            if (entry.unregister(eid)) {
+                cacheFunctions.remove(fid);
+                entry.close();
             }
         }

Review comment:
       I think it's the core problem here. Without the `synchronized` keyword, it's hard to make this method thread safe. Because the first `get` call here might return an outdated `FunctionCacheEntry` object, if we called the `remove` method, the newly added `FunctionCacheEntry` might be removed unexpectedly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809856204



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -72,87 +70,70 @@ public void registerFunctionInstance(String fid,
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
+        URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+        int count = 0;
+        try {
+            // add jar files to urls
+            for (String jarFile : requiredJarFiles) {
+                urls[count++] = new File(jarFile).toURI().toURL();
+            }
+
+            // add classpaths
+            for (URL url : requiredClasspaths) {
+                urls[count++] = url;
+            }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
-
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
-
-                    cacheFunctions.put(
-                        fid,
-                        new FunctionCacheEntry(
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(
+                    fid,
+                    new FunctionCacheEntry(
                             requiredJarFiles,
                             requiredClasspaths,
                             urls,
                             eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
-                }
-            } else {
+            if (entry != null) {
                 entry.register(
-                    eid,
-                    requiredJarFiles,
-                    requiredClasspaths);
+                        eid,
+                        requiredJarFiles,
+                        requiredClasspaths);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
     public void registerFunctionInstanceWithArchive(String fid, String eid,
-                                                    String narArchive, String narExtractionDirectory) throws IOException {
+                                                    String narArchive,
+                                                    String narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
-
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
+        // Create new cache entry.
+        try {
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(fid, new FunctionCacheEntry(narArchive, eid,
+                    rootClassLoader, narExtractionDirectory));
             if (null != entry) {
                 entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
-
-            // Create new cache entry
-            try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
-            } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
-    public void unregisterFunctionInstance(String fid,
-                                           String eid) {
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null != entry) {
-                if (entry.unregister(eid)) {
-                    cacheFunctions.remove(fid);
-                    entry.close();
-                }
+    public void unregisterFunctionInstance(String fid, String eid) {
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
+        if (null != entry) {
+            if (entry.unregister(eid)) {
+                cacheFunctions.remove(fid);
+                entry.close();
             }
         }

Review comment:
       Sorry my suggestion change here is wrong. It looks like we cannot `remove` it if `unregister` returns false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1044232229


   You can use following code to check if the previous doesn't exist as I've said.
   
   ```java
           final AtomicBoolean computed = new AtomicBoolean(false);
           map.computeIfAbsent("A", __ -> {
               computed.set(true); // it's true only when the value is computed
               return newValue();
           });
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1044260854


   For example, here is an example implementation for `registerFunctionInstanceWithArchive`:
   
   ```java
           final AtomicBoolean computed = new AtomicBoolean(false);
           final AtomicReference<Throwable> throwable = new AtomicReference<>();
           final FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
               try {
                   final FunctionCacheEntry cacheEntry =
                           new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory);
                   computed.set(true);
                   return cacheEntry;
               } catch (IOException e) { // I think it's better to catch `IOException`, instead of `Throwable`
                   throwable.set(e);
                   return null;
               }
           });
           if (throwable.get() != null) {
               Exceptions.rethrowIOException(throwable.get());
           }
           if (!computed.get() && entry != null) { // the key already exists
               entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
           }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809725074



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -73,39 +71,37 @@ public void registerFunctionInstance(String fid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
 
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
+        if (null == entry) {
+            URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+            int count = 0;
+            try {
+                // add jar files to urls
+                for (String jarFile : requiredJarFiles) {
+                    urls[count++] = new File(jarFile).toURI().toURL();
+                }
 
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
+                // add classpaths
+                for (URL url : requiredClasspaths) {
+                    urls[count++] = url;
+                }
 
-                    cacheFunctions.put(
+                cacheFunctions.put(
                         fid,
                         new FunctionCacheEntry(
-                            requiredJarFiles,
-                            requiredClasspaths,
-                            urls,
-                            eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
-                }

Review comment:
       @BewareMyPower  pTAL

##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -116,43 +112,35 @@ public void registerFunctionInstanceWithArchive(String fid, String eid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
 
-            if (null != entry) {
-                entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
+        if (null != entry) {
+            entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
+            return;
+        }
 
-            // Create new cache entry
-            try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
-            } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
-            }
+        // Create new cache entry
+        try {
+            cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }

Review comment:
       @BewareMyPower pTAL




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809955217



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -73,72 +73,74 @@ public void registerFunctionInstance(String fid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
-
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
-
-                    cacheFunctions.put(
-                        fid,
-                        new FunctionCacheEntry(
-                            requiredJarFiles,
-                            requiredClasspaths,
-                            urls,
-                            eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
+        final AtomicBoolean computed = new AtomicBoolean(false);
+        final AtomicReference<Throwable> throwable = new AtomicReference<>();
+        FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
+            URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+            int count = 0;
+            try {
+                // add jar files to urls
+                for (String jarFile : requiredJarFiles) {
+                    urls[count++] = new File(jarFile).toURI().toURL();
+                }
+                // add classpaths
+                for (URL url : requiredClasspaths) {
+                    urls[count++] = url;
                 }
-            } else {
-                entry.register(
+                final FunctionCacheEntry cacheEntry = new FunctionCacheEntry(
+                        requiredJarFiles, requiredClasspaths, urls, eid, rootClassLoader);
+                computed.set(true);
+                return cacheEntry;
+            } catch (Throwable cause) {
+                throwable.set(cause);
+                return null;
+            }
+        });
+        if (throwable.get() != null) {
+            Exceptions.rethrowIOException(throwable.get());
+        }
+        if (!computed.get() && entry != null) { // the key already exists
+            entry.register(
                     eid,
                     requiredJarFiles,
                     requiredClasspaths);
-            }
         }
     }
 
     @Override
     public void registerFunctionInstanceWithArchive(String fid, String eid,
-                                                    String narArchive, String narExtractionDirectory) throws IOException {
+                                                    String narArchive,
+                                                    String narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
-
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null != entry) {
-                entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
-
-            // Create new cache entry
+        // Create new cache entry.
+        final AtomicBoolean computed = new AtomicBoolean(false);
+        final AtomicReference<Throwable> throwable = new AtomicReference<>();
+        FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
             try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
+                final FunctionCacheEntry cacheEntry =
+                        new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory);
+                computed.set(true);
+                return cacheEntry;
             } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
+                throwable.set(cause);
+                return null;
             }
+        });
+        if (throwable.get() != null) {
+            Exceptions.rethrowIOException(throwable.get());
+        }
+        if (null != entry && computed.get()) {
+            entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
         }
+
     }
 
     @Override
-    public void unregisterFunctionInstance(String fid,
-                                           String eid) {
+    public void unregisterFunctionInstance(String fid, String eid) {
         synchronized (cacheFunctions) {
             FunctionCacheEntry entry = cacheFunctions.get(fid);
-
             if (null != entry) {
                 if (entry.unregister(eid)) {
                     cacheFunctions.remove(fid);

Review comment:
       For concurrent container, we cannot use the object's lock to make composite atomic operations of a concurrent container thread safe. Because the thread safety of a concurrent container are not achieved by locking the object itself.
   
   You can run following code:
   
   ```java
           final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
           map.put("A", "Value");
   
           final ExecutorService executorService = Executors.newSingleThreadExecutor();
           executorService.execute(() -> {
               try {
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
               }
               map.remove("A");
           });
           synchronized (map) { // NOTE: it's meaningless and cannot guarantee any thread safety of the code block
               final String value = map.get("A");
               if (value != null) {
                   Thread.sleep(1000);
                   final String value1 = map.remove("A");
                   System.out.println("value: " + value + ", value1: " + value1);
               }
           }
           executorService.shutdown();
   ```
   
   The output is:
   
   ```
   value: Value, value1: null
   ```
   
   If you changed `Thread.sleep(500);` to `Thread.sleep(1500);` in the thread function, the output would become:
   
   ```
   value: Value, value1: Value
   ```
   
   Therefore, I think even if the `cacheFunctions` is a concurrent hash map, all access to the field should be protected by `synchronized`. Actually, `cacheFunctions` should just be a regular hash map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1042821404


   @gaozhangmin:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14346: [Pulsar-Functions] Use ComputeIfAbsent instead in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r810526246



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -74,9 +76,9 @@ public void registerFunctionInstance(String fid,
         }
 
         synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
+            final AtomicBoolean computed = new AtomicBoolean(false);
+            final AtomicReference<Throwable> throwable = new AtomicReference<>();
+            FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {

Review comment:
       Please note that the contract of computeIfAbsent is that the function must be side effect free because it can be executed multiple times.
   I am not sure that this function is side effect free
   Can you please double check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1042916796


   /pulsarbot run-failure-checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #14346: [Pulsar-Functions] Use ComputeIfAbsent instead in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1046768868


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1042816780


   @gaozhangmin:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809854176



##########
File path: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -72,87 +70,70 @@ public void registerFunctionInstance(String fid,
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
+        URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+        int count = 0;
+        try {
+            // add jar files to urls
+            for (String jarFile : requiredJarFiles) {
+                urls[count++] = new File(jarFile).toURI().toURL();
+            }
+
+            // add classpaths
+            for (URL url : requiredClasspaths) {
+                urls[count++] = url;
+            }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
-
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
-
-                    cacheFunctions.put(
-                        fid,
-                        new FunctionCacheEntry(
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(
+                    fid,
+                    new FunctionCacheEntry(
                             requiredJarFiles,
                             requiredClasspaths,
                             urls,
                             eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
-                }
-            } else {
+            if (entry != null) {
                 entry.register(
-                    eid,
-                    requiredJarFiles,
-                    requiredClasspaths);
+                        eid,
+                        requiredJarFiles,
+                        requiredClasspaths);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
     public void registerFunctionInstanceWithArchive(String fid, String eid,
-                                                    String narArchive, String narExtractionDirectory) throws IOException {
+                                                    String narArchive,
+                                                    String narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
-
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
+        // Create new cache entry.
+        try {
+            FunctionCacheEntry entry = cacheFunctions.putIfAbsent(fid, new FunctionCacheEntry(narArchive, eid,
+                    rootClassLoader, narExtractionDirectory));
             if (null != entry) {
                 entry.register(eid, Collections.singleton(narArchive), Collections.emptyList());
-                return;
-            }
-
-            // Create new cache entry
-            try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory));
-            } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
             }
+        } catch (Throwable cause) {
+            Exceptions.rethrowIOException(cause);
         }
     }
 
     @Override
-    public void unregisterFunctionInstance(String fid,
-                                           String eid) {
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null != entry) {
-                if (entry.unregister(eid)) {
-                    cacheFunctions.remove(fid);
-                    entry.close();
-                }
+    public void unregisterFunctionInstance(String fid, String eid) {
+        FunctionCacheEntry entry = cacheFunctions.get(fid);
+        if (null != entry) {
+            if (entry.unregister(eid)) {
+                cacheFunctions.remove(fid);
+                entry.close();
             }
         }

Review comment:
       `get` and `remove` are also two atomic operations. We can use `remove` instead of them.
   
   ```suggestion
           FunctionCacheEntry entry = cacheFunctions.remove(fid);
           if (entry != null && entry.unregister(eid)) {
               entry.close();
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin closed pull request #14346: [Pulsar-Functions] Use ComputeIfAbsent instead in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin closed pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1042891846


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #14346: [Pulsar-Functions]Remove unnecessary synchronized-block in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#issuecomment-1044229473


   > Don't use `putIfAbsent`. Use `computeIfAbsent`.
   > 
   > For example,
   > 
   > ```java
   >     private static String newValue() {
   >         System.out.println("Call newValue");
   >         return "hello";
   >     }
   > 
   >     public static void main(String[] args) {
   >         final Map<String, String> map = new ConcurrentHashMap<>();
   >         System.out.println(map.putIfAbsent("A", newValue()));
   >         System.out.println(map.putIfAbsent("A", newValue()));
   >     }
   > ```
   > 
   > The output is:
   > 
   > ```
   > Call newValue
   > null
   > Call newValue
   > hello
   > ```
   > 
   > But if you use
   > 
   > ```java
   >         System.out.println(map.computeIfAbsent("A", __ -> newValue()));
   >         System.out.println(map.computeIfAbsent("A", __ -> newValue()));
   > ```
   > 
   > The output will be:
   > 
   > ```
   > Call newValue
   > hello
   > hello
   > ```
   > 
   > You can see `computeIfAbsent` avoids computing the value if the key already exists. Though the returned value is always non-null. So I suggested capturing an atomic boolean in the function of `computeIfAbsent` like:
   > 
   > ```java
   >         final AtomicBoolean computed = new AtomicBoolean(false);
   >         map.computeIfAbsent("A", __ -> {
   >             computed.set(true);
   >             return newValue();
   >         });
   > ```
   
   We need use putIfAbsent here , as previous logic, Only If the key is existed before , we register the entry.
   If We use computeIfAbsent, the return value will be previous value or computed value, We cannot judge if the key existed before. @BewareMyPower 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin closed pull request #14346: [Pulsar-Functions] Use ComputeIfAbsent instead in FunctionCacheManagerImpl

Posted by GitBox <gi...@apache.org>.
gaozhangmin closed pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org