You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/05/05 19:52:11 UTC

[hadoop] branch trunk updated: HADOOP-11616. Remove workaround for Curator's ChildReaper requiring Guava 15+ (#2973)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b93e448  HADOOP-11616. Remove workaround for Curator's ChildReaper requiring Guava 15+ (#2973)
b93e448 is described below

commit b93e448f9aa66689f1ce5059f6cdce8add130457
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Thu May 6 01:22:02 2021 +0530

    HADOOP-11616. Remove workaround for Curator's ChildReaper requiring Guava 15+ (#2973)
    
    Reviewed-by: Wei-Chiu Chuang <we...@apache.org>
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
 .../apache/hadoop/util/curator/ChildReaper.java    | 234 ---------------------
 .../hadoop/util/curator/TestChildReaper.java       | 209 ------------------
 2 files changed, 443 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
deleted file mode 100644
index e125dbf..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.util.curator;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.curator.framework.recipes.locks.Reaper;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
-
-/**
- * This is a copy of Curator 2.7.1's ChildReaper class, modified to work with
- * Guava 11.0.2.  The problem is the 'paths' Collection, which calls Guava's
- * Sets.newConcurrentHashSet(), which was added in Guava 15.0.
- * <p>
- * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
- * the node and adds empty nodes to an internally managed {@link Reaper}
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class ChildReaper implements Closeable
-{
-  private final Logger log = LoggerFactory.getLogger(getClass());
-  private final Reaper reaper;
-  private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-  private final CuratorFramework client;
-  private final Collection<String> paths = newConcurrentHashSet();
-  private final Reaper.Mode mode;
-  private final CloseableScheduledExecutorService executor;
-  private final int reapingThresholdMs;
-
-  private volatile Future<?> task;
-
-  // This is copied from Curator's Reaper class
-  static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
-
-  // This is copied from Guava
-  /**
-   * Creates a thread-safe set backed by a hash map. The set is backed by a
-   * {@link ConcurrentHashMap} instance, and thus carries the same concurrency
-   * guarantees.
-   *
-   * <p>Unlike {@code HashSet}, this class does NOT allow {@code null} to be
-   * used as an element. The set is serializable.
-   *
-   * @return a new, empty thread-safe {@code Set}
-   * @since 15.0
-   */
-  public static <E> Set<E> newConcurrentHashSet() {
-    return Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
-  }
-
-  private enum State
-  {
-    LATENT,
-    STARTED,
-    CLOSED
-  }
-
-  /**
-   * @param client the client
-   * @param path path to reap children from
-   * @param mode reaping mode
-   */
-  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
-  {
-    this(client, path, mode, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
-  }
-
-  /**
-   * @param client the client
-   * @param path path to reap children from
-   * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-   * @param mode reaping mode
-   */
-  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
-  {
-    this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
-  }
-
-  /**
-   * @param client the client
-   * @param path path to reap children from
-   * @param executor executor to use for background tasks
-   * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-   * @param mode reaping mode
-   */
-  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
-  {
-    this(client, path, mode, executor, reapingThresholdMs, null);
-  }
-
-  /**
-   * @param client the client
-   * @param path path to reap children from
-   * @param executor executor to use for background tasks
-   * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-   * @param mode reaping mode
-   * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
-   */
-  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
-  {
-    this.client = client;
-    this.mode = mode;
-    this.executor = new CloseableScheduledExecutorService(executor);
-    this.reapingThresholdMs = reapingThresholdMs;
-    this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
-    addPath(path);
-  }
-
-  /**
-   * The reaper must be started
-   *
-   * @throws Exception errors
-   */
-  public void start() throws Exception
-  {
-    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-    task = executor.scheduleWithFixedDelay
-        (
-            new Runnable()
-            {
-              @Override
-              public void run()
-              {
-                doWork();
-              }
-            },
-            reapingThresholdMs,
-            reapingThresholdMs,
-            TimeUnit.MILLISECONDS
-        );
-
-    reaper.start();
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-    {
-      CloseableUtils.closeQuietly(reaper);
-      task.cancel(true);
-    }
-  }
-
-  /**
-   * Add a path to reap children from
-   *
-   * @param path the path
-   * @return this for chaining
-   */
-  public ChildReaper addPath(String path)
-  {
-    paths.add(PathUtils.validatePath(path));
-    return this;
-  }
-
-  /**
-   * Remove a path from reaping
-   *
-   * @param path the path
-   * @return true if the path existed and was removed
-   */
-  public boolean removePath(String path)
-  {
-    return paths.remove(PathUtils.validatePath(path));
-  }
-
-  private static ScheduledExecutorService newExecutorService()
-  {
-    return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
-  }
-
-  private void doWork()
-  {
-    for ( String path : paths )
-    {
-      try
-      {
-        List<String> children = client.getChildren().forPath(path);
-        for ( String name : children )
-        {
-          String thisPath = ZKPaths.makePath(path, name);
-          Stat stat = client.checkExists().forPath(thisPath);
-          if ( (stat != null) && (stat.getNumChildren() == 0) )
-          {
-            reaper.addPath(thisPath, mode);
-          }
-        }
-      }
-      catch ( Exception e )
-      {
-        log.error("Could not get children for path: " + path, e);
-      }
-    }
-  }
-}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
deleted file mode 100644
index 9604718..0000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.curator;
-
-import org.apache.curator.framework.recipes.locks.Reaper;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.Timing;
-import org.apache.zookeeper.data.Stat;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.BindException;
-import java.util.Random;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * This is a copy of Curator 2.7.1's TestChildReaper class, with minor
- * modifications to make it work with JUnit (some setup code taken from
- * Curator's BaseClassForTests).  This is to ensure that the ChildReaper
- * class we modified is still correct.
- */
-public class TestChildReaper
-{
-  protected TestingServer server;
-
-  @Before
-  public void setup() throws Exception {
-    while(this.server == null) {
-      try {
-        this.server = new TestingServer();
-      } catch (BindException var2) {
-        System.err.println("Getting bind exception - retrying to allocate server");
-        this.server = null;
-      }
-    }
-  }
-
-  @After
-  public void teardown() throws Exception {
-    this.server.close();
-    this.server = null;
-  }
-
-  @Test
-  public void     testSomeNodes() throws Exception
-  {
-
-    Timing                  timing = new Timing();
-    ChildReaper             reaper = null;
-    CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-    try
-    {
-      client.start();
-
-      Random              r = new Random();
-      int                 nonEmptyNodes = 0;
-      for ( int i = 0; i < 10; ++i )
-      {
-        client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-        if ( r.nextBoolean() )
-        {
-          client.create().forPath("/test/" + Integer.toString(i) + "/foo");
-          ++nonEmptyNodes;
-        }
-      }
-
-      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-      reaper.start();
-
-      timing.forWaiting().sleepABit();
-
-      Stat    stat = client.checkExists().forPath("/test");
-      assertThat(stat.getNumChildren()).isEqualTo(nonEmptyNodes);
-    }
-    finally
-    {
-      CloseableUtils.closeQuietly(reaper);
-      CloseableUtils.closeQuietly(client);
-    }
-  }
-
-  @Test
-  public void     testSimple() throws Exception
-  {
-    Timing                  timing = new Timing();
-    ChildReaper             reaper = null;
-    CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-    try
-    {
-      client.start();
-
-      for ( int i = 0; i < 10; ++i )
-      {
-        client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-      }
-
-      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-      reaper.start();
-
-      timing.forWaiting().sleepABit();
-
-      Stat    stat = client.checkExists().forPath("/test");
-      assertThat(stat.getNumChildren()).isZero();
-    }
-    finally
-    {
-      CloseableUtils.closeQuietly(reaper);
-      CloseableUtils.closeQuietly(client);
-    }
-  }
-
-  @Test
-  public void     testMultiPath() throws Exception
-  {
-    Timing                  timing = new Timing();
-    ChildReaper             reaper = null;
-    CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-    try
-    {
-      client.start();
-
-      for ( int i = 0; i < 10; ++i )
-      {
-        client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i));
-        client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i));
-        client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i));
-      }
-
-      reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-      reaper.start();
-      reaper.addPath("/test1");
-
-      timing.forWaiting().sleepABit();
-
-      Stat    stat = client.checkExists().forPath("/test1");
-      assertThat(stat.getNumChildren()).isZero();
-      stat = client.checkExists().forPath("/test2");
-      assertThat(stat.getNumChildren()).isZero();
-      stat = client.checkExists().forPath("/test3");
-      assertThat(stat.getNumChildren()).isEqualTo(10);
-    }
-    finally
-    {
-      CloseableUtils.closeQuietly(reaper);
-      CloseableUtils.closeQuietly(client);
-    }
-  }
-
-  @Test
-  public void     testNamespace() throws Exception
-  {
-    Timing                  timing = new Timing();
-    ChildReaper             reaper = null;
-    CuratorFramework        client = CuratorFrameworkFactory.builder()
-        .connectString(server.getConnectString())
-        .sessionTimeoutMs(timing.session())
-        .connectionTimeoutMs(timing.connection())
-        .retryPolicy(new RetryOneTime(1))
-        .namespace("foo")
-        .build();
-    try
-    {
-      client.start();
-
-      for ( int i = 0; i < 10; ++i )
-      {
-        client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-      }
-
-      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-      reaper.start();
-
-      timing.forWaiting().sleepABit();
-
-      Stat    stat = client.checkExists().forPath("/test");
-      assertThat(stat.getNumChildren()).isZero();
-
-      stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
-      assertThat(stat).isNotNull();
-      assertThat(stat.getNumChildren()).isZero();
-    }
-    finally
-    {
-      CloseableUtils.closeQuietly(reaper);
-      CloseableUtils.closeQuietly(client);
-    }
-  }
-}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org