You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/08/05 07:01:30 UTC

[GitHub] [hadoop] virajjasani commented on a change in pull request #3266: HADOOP-17835. Use CuratorCache implementation instead of PathChildrenCache / TreeCache

virajjasani commented on a change in pull request #3266:
URL: https://github.com/apache/hadoop/pull/3266#discussion_r683182286



##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
##########
@@ -363,105 +357,122 @@ public void startThreads() throws IOException {
       throw new RuntimeException("Could not create ZK paths");
     }
     try {
-      keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
+      keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT)
+          .build();
       if (keyCache != null) {
-        keyCache.start(StartMode.BUILD_INITIAL_CACHE);
-        keyCache.getListenable().addListener(new PathChildrenCacheListener() {
-          @Override
-          public void childEvent(CuratorFramework client,
-              PathChildrenCacheEvent event)
-              throws Exception {
-            switch (event.getType()) {
-            case CHILD_ADDED:
-              processKeyAddOrUpdate(event.getData().getData());
-              break;
-            case CHILD_UPDATED:
-              processKeyAddOrUpdate(event.getData().getData());
-              break;
-            case CHILD_REMOVED:
-              processKeyRemoved(event.getData().getPath());
-              break;
-            default:
-              break;
-            }
-          }
-        }, listenerThreadPool);
+        CuratorCacheListener keyCacheListener = CuratorCacheListener.builder()
+            .forCreates(childData -> {
+              try {
+                processKeyAddOrUpdate(childData.getData());
+              } catch (IOException e) {
+                LOG.error("Error while processing Curator keyCacheListener "
+                    + "NODE_CREATED event");
+                throw new UncheckedIOException(e);
+              }
+            })
+            .forChanges((oldNode, node) -> {
+              try {
+                processKeyAddOrUpdate(node.getData());
+              } catch (IOException e) {
+                LOG.error("Error while processing Curator keyCacheListener "
+                    + "NODE_CHANGED event");
+                throw new UncheckedIOException(e);
+              }
+            })
+            .forDeletes(childData -> processKeyRemoved(childData.getPath()))
+            .build();
+        keyCache.listenable().addListener(keyCacheListener);
+        keyCache.start();
         loadFromZKCache(false);
       }
     } catch (Exception e) {
-      throw new IOException("Could not start PathChildrenCache for keys", e);
+      throw new IOException("Could not start Curator keyCacheListener for keys",
+          e);
     }
     if (isTokenWatcherEnabled) {
       LOG.info("TokenCache is enabled");
       try {
-        tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
+        tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT)
+            .build();
         if (tokenCache != null) {
-          tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
-          tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
-
-            @Override
-            public void childEvent(CuratorFramework client,
-                                   PathChildrenCacheEvent event) throws Exception {
-              switch (event.getType()) {
-                case CHILD_ADDED:
-                  processTokenAddOrUpdate(event.getData().getData());
-                  break;
-                case CHILD_UPDATED:
-                  processTokenAddOrUpdate(event.getData().getData());
-                  break;
-                case CHILD_REMOVED:
-                  processTokenRemoved(event.getData());
-                  break;
-                default:
-                  break;
-              }
-            }
-          }, listenerThreadPool);
+          CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
+              .forCreates(childData -> {
+                try {
+                  processTokenAddOrUpdate(childData.getData());
+                } catch (IOException e) {
+                  LOG.error("Error while processing Curator tokenCacheListener "
+                      + "NODE_CREATED event");
+                  throw new UncheckedIOException(e);
+                }
+              })
+              .forChanges((oldNode, node) -> {
+                try {
+                  processTokenAddOrUpdate(node.getData());
+                } catch (IOException e) {
+                  LOG.error("Error while processing Curator tokenCacheListener "
+                      + "NODE_CHANGED event");
+                  throw new UncheckedIOException(e);
+                }
+              })
+              .forDeletes(childData -> {
+                try {
+                  processTokenRemoved(childData);
+                } catch (IOException e) {
+                  LOG.error("Error while processing Curator tokenCacheListener "
+                      + "NODE_DELETED event");
+                  throw new UncheckedIOException(e);
+                }
+              })
+              .build();
+          tokenCache.listenable().addListener(tokenCacheListener);
+          tokenCache.start();
           loadFromZKCache(true);
         }
       } catch (Exception e) {
-        throw new IOException("Could not start PathChildrenCache for tokens", e);
+        throw new IOException(
+            "Could not start Curator tokenCacheListener for tokens", e);
       }
     }
     super.startThreads();
   }
 
   /**
-   * Load the PathChildrenCache into the in-memory map. Possible caches to be
+   * Load the CuratorCache into the in-memory map. Possible caches to be
    * loaded are keyCache and tokenCache.
    *
    * @param isTokenCache true if loading tokenCache, false if loading keyCache.
    */
   private void loadFromZKCache(final boolean isTokenCache) {
     final String cacheName = isTokenCache ? "token" : "key";
     LOG.info("Starting to load {} cache.", cacheName);
-    final List<ChildData> children;
+    final Stream<ChildData> children;
     if (isTokenCache) {
-      children = tokenCache.getCurrentData();
+      children = tokenCache.stream();
     } else {
-      children = keyCache.getCurrentData();
+      children = keyCache.stream();
     }
 
-    int count = 0;
-    for (ChildData child : children) {
+    final AtomicInteger count = new AtomicInteger(0);
+    children.forEach(childData -> {

Review comment:
       I think it's fine even with serial (as we were doing before this change in for loop), but in case you are wondering why we are using AtomicInteger here, it's because lambda allows only final or eventually final variables inside the block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org