You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/09/10 03:53:34 UTC

[incubator-druid] branch 0.16.0-incubating updated: Exit JVM on curator unhandled errors (#8458) (#8498)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.16.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.16.0-incubating by this push:
     new 65f5ac2  Exit JVM on curator unhandled errors (#8458) (#8498)
65f5ac2 is described below

commit 65f5ac23021e00c14d7e53396396b3ad9d6f08a2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Sep 9 20:53:22 2019 -0700

    Exit JVM on curator unhandled errors (#8458) (#8498)
    
    * Exit JVM on curator unhandled errors
    
    If an unhandled error occurs when curator is talking to ZooKeeper, exit
    the JVM in addition to stopping the lifecycle to prevent the process
    from being left in a zombie state. With this change,
    BoundedExponentialBackoffRetryWithQuit is no longer needed as when
    curator exceeds the configured retries, it triggers its unhandled error
    listeners. A new "connectionTimeoutMs" CuratorConfig setting is added
    mostly to facilitate testing curator unhandled errors, but it may be
    useful for users as well.
    
    * Address review comments
---
 .../druid/testing/junit/LoggerCaptureRule.java     | 106 ++++++++++++++++++++
 docs/configuration/index.md                        |   2 +-
 pom.xml                                            |   6 ++
 server/pom.xml                                     |   5 +
 .../BoundedExponentialBackoffRetryWithQuit.java    |  63 ------------
 .../org/apache/druid/curator/CuratorConfig.java    |  43 ++++----
 .../org/apache/druid/curator/CuratorModule.java    |  82 ++++-----------
 ...BoundedExponentialBackoffRetryWithQuitTest.java | 111 ---------------------
 .../apache/druid/curator/CuratorModuleTest.java    | 104 +++++++++++++++----
 9 files changed, 242 insertions(+), 280 deletions(-)

diff --git a/core/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java b/core/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java
new file mode 100644
index 0000000..79c5643
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.junit;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.rules.ExternalResource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests.
+ */
+public class LoggerCaptureRule extends ExternalResource
+{
+  private final Class<?> targetClass;
+
+  private InMemoryAppender inMemoryAppender;
+  private LoggerConfig targetClassLoggerConfig;
+
+  public LoggerCaptureRule(Class<?> targetClass)
+  {
+    this.targetClass = targetClass;
+  }
+
+  @Override
+  protected void before()
+  {
+    inMemoryAppender = new InMemoryAppender();
+    LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+    Configuration configuration = loggerContext.getConfiguration();
+    targetClassLoggerConfig = configuration.getLoggerConfig(targetClass.getName());
+    targetClassLoggerConfig.addAppender(inMemoryAppender, Level.ALL, null);
+  }
+
+  @Override
+  protected void after()
+  {
+    clearLogEvents();
+    targetClassLoggerConfig.removeAppender(InMemoryAppender.NAME);
+  }
+
+  public List<LogEvent> getLogEvents()
+  {
+    return inMemoryAppender.getLogEvents();
+  }
+
+  public void clearLogEvents()
+  {
+    inMemoryAppender.clearLogEvents();
+  }
+
+  private static class InMemoryAppender extends AbstractAppender
+  {
+    static final String NAME = InMemoryAppender.class.getName();
+
+    private final List<LogEvent> logEvents;
+
+    InMemoryAppender()
+    {
+      super(NAME, null, null);
+      logEvents = new ArrayList<>();
+    }
+
+    @Override
+    public void append(LogEvent logEvent)
+    {
+      logEvents.add(logEvent);
+    }
+
+    List<LogEvent> getLogEvents()
+    {
+      return Collections.unmodifiableList(logEvents);
+    }
+
+    void clearLogEvents()
+    {
+      logEvents.clear();
+    }
+  }
+}
+
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 7d8aaaa..aa1cd93 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -107,13 +107,13 @@ We recommend just setting the base ZK path and the ZK service host, but all ZK p
 |`druid.zk.service.user`|The username to authenticate with ZooKeeper. This is an optional property.|none|
 |`druid.zk.service.pwd`|The [Password Provider](../operations/password-provider.md) or the string password to authenticate with ZooKeeper. This is an optional property.|none|
 |`druid.zk.service.authScheme`|digest is the only authentication scheme supported. |digest|
-|`druid.zk.service.terminateDruidProcessOnConnectFail`|If set to 'true' and the connection to ZooKeeper fails (after exhausting all potential backoff retires), Druid process terminates itself with exit code 1.|false|
 
 #### Zookeeper Behavior
 
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.zk.service.sessionTimeoutMs`|ZooKeeper session timeout, in milliseconds.|`30000`|
+|`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in milliseconds.|`15000`|
 |`druid.zk.service.compress`|Boolean flag for whether or not created Znodes should be compressed.|`true`|
 |`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security for ZooKeeper. If ACL is enabled, zNode creators will have all permissions.|`false`|
 
diff --git a/pom.xml b/pom.xml
index 97473f7..095de02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -967,6 +967,12 @@
                 <version>${guava.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>com.github.stefanbirkner</groupId>
+                <artifactId>system-rules</artifactId>
+                <version>1.19.0</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/server/pom.xml b/server/pom.xml
index ef0276f..873725d 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -265,6 +265,11 @@
             <artifactId>JUnitParams</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.stefanbirkner</groupId>
+            <artifactId>system-rules</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java b/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java
deleted file mode 100644
index 531edb4..0000000
--- a/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.curator;
-
-import org.apache.curator.RetrySleeper;
-import org.apache.curator.retry.BoundedExponentialBackoffRetry;
-import org.apache.druid.java.util.common.logger.Logger;
-
-/**
- * BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry for simplicity. It's not actually a
- * BoundedExponentialBackoffRetry from the Liskov substitution principle point of view,
- * but it doesn't matter in this code.
- *
- */
-public class BoundedExponentialBackoffRetryWithQuit extends BoundedExponentialBackoffRetry
-{
-
-  private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuit.class);
-
-  private final Runnable exitRunner;
-
-  public BoundedExponentialBackoffRetryWithQuit(
-      Runnable exitRunner,
-      int baseSleepTimeMs,
-      int maxSleepTimeMs,
-      int maxRetries
-  )
-  {
-    super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
-    this.exitRunner = exitRunner;
-    log.info("BoundedExponentialBackoffRetryWithQuit Retry Policy selected.");
-  }
-
-  @Override
-  public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
-  {
-    log.warn("Zookeeper can't be reached, retrying (retryCount = %s out of %s)...", retryCount, this.getN());
-    boolean shouldRetry = super.allowRetry(retryCount, elapsedTimeMs, sleeper);
-    if (!shouldRetry) {
-      log.warn("Since Zookeeper can't be reached after retries exhausted, calling exit function...");
-      exitRunner.run();
-    }
-    return shouldRetry;
-  }
-
-}
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
index 52af44e..d632523 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
@@ -26,16 +26,20 @@ import org.apache.druid.metadata.PasswordProvider;
 
 import javax.validation.constraints.Min;
 
-/**
- */
 public class CuratorConfig
 {
-  @JsonProperty("host")
+  static final String HOST = "host";
+  @JsonProperty(HOST)
   private String zkHosts = "localhost";
 
   @JsonProperty("sessionTimeoutMs")
   @Min(0)
-  private int zkSessionTimeoutMs = 30000;
+  private int zkSessionTimeoutMs = 30_000;
+
+  static final String CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+  @JsonProperty(CONNECTION_TIMEOUT_MS)
+  @Min(0)
+  private int zkConnectionTimeoutMs = 15_000;  // same as Curator default: https://git.io/fjhhr
 
   @JsonProperty("compress")
   private boolean enableCompression = true;
@@ -52,10 +56,6 @@ public class CuratorConfig
   @JsonProperty("authScheme")
   private String authScheme = "digest";
 
-  @JsonProperty("terminateDruidProcessOnConnectFail")
-  private boolean terminateDruidProcessOnConnectFail = false;
-
-
   public String getZkHosts()
   {
     return zkHosts;
@@ -66,7 +66,7 @@ public class CuratorConfig
     this.zkHosts = zkHosts;
   }
 
-  public Integer getZkSessionTimeoutMs()
+  public int getZkSessionTimeoutMs()
   {
     return zkSessionTimeoutMs;
   }
@@ -76,6 +76,16 @@ public class CuratorConfig
     this.zkSessionTimeoutMs = zkSessionTimeoutMs;
   }
 
+  public int getZkConnectionTimeoutMs()
+  {
+    return zkConnectionTimeoutMs;
+  }
+
+  public void setZkConnectionTimeoutMs(Integer zkConnectionTimeoutMs)
+  {
+    this.zkConnectionTimeoutMs = zkConnectionTimeoutMs;
+  }
+
   public boolean getEnableCompression()
   {
     return enableCompression;
@@ -112,19 +122,4 @@ public class CuratorConfig
   {
     return authScheme;
   }
-
-  public boolean getTerminateDruidProcessOnConnectFail()
-  {
-    return terminateDruidProcessOnConnectFail;
-  }
-
-  public void setTerminateDruidProcessOnConnectFail(Boolean terminateDruidProcessOnConnectFail)
-  {
-    if (terminateDruidProcessOnConnectFail == null) {
-      this.terminateDruidProcessOnConnectFail = false;
-    } else {
-      this.terminateDruidProcessOnConnectFail = terminateDruidProcessOnConnectFail;
-    }
-  }
-
 }
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
index 2c27dba..3095991 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
@@ -31,7 +31,6 @@ import org.apache.curator.ensemble.exhibitor.Exhibitors;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
@@ -47,8 +46,6 @@ import org.apache.zookeeper.data.ACL;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
-/**
- */
 public class CuratorModule implements Module
 {
   static final String CURATOR_CONFIG_PREFIX = "druid.zk.service";
@@ -75,7 +72,7 @@ public class CuratorModule implements Module
   @SuppressForbidden(reason = "System#err")
   public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
   {
-    final Builder builder = CuratorFrameworkFactory.builder();
+    final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
     if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) {
       builder.authorization(
           config.getAuthScheme(),
@@ -83,31 +80,12 @@ public class CuratorModule implements Module
       );
     }
 
-    RetryPolicy retryPolicy;
-    if (config.getTerminateDruidProcessOnConnectFail()) {
-      final Runnable exitRunner = () -> {
-        try {
-          log.error("Zookeeper can't be reached, forcefully stopping lifecycle...");
-          lifecycle.stop();
-          System.err.println("Zookeeper can't be reached, forcefully stopping virtual machine...");
-        }
-        finally {
-          System.exit(1);
-        }
-      };
-      retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
-          exitRunner,
-          BASE_SLEEP_TIME_MS,
-          MAX_SLEEP_TIME_MS,
-          MAX_RETRIES
-      );
-    } else {
-      retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
-    }
+    RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
 
     final CuratorFramework framework = builder
         .ensembleProvider(ensembleProvider)
         .sessionTimeoutMs(config.getZkSessionTimeoutMs())
+        .connectionTimeoutMs(config.getZkConnectionTimeoutMs())
         .retryPolicy(retryPolicy)
         .compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
         .aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
@@ -115,12 +93,7 @@ public class CuratorModule implements Module
 
     framework.getUnhandledErrorListenable().addListener((message, e) -> {
       log.error(e, "Unhandled error in Curator Framework");
-      try {
-        lifecycle.stop();
-      }
-      catch (Throwable t) {
-        log.warn(t, "Exception when stopping druid lifecycle");
-      }
+      shutdown(lifecycle);
     });
 
     lifecycle.addHandler(
@@ -153,29 +126,7 @@ public class CuratorModule implements Module
       return new FixedEnsembleProvider(config.getZkHosts());
     }
 
-    RetryPolicy retryPolicy;
-    if (config.getTerminateDruidProcessOnConnectFail()) {
-      // It's unknown whether or not this precaution is needed.  Tests revealed that this path was never taken.
-      //  see discussions in https://github.com/apache/incubator-druid/pull/6740
-
-      final Runnable exitRunner = () -> {
-        try {
-          log.error("Zookeeper can't be reached, forcefully stopping virtual machine...");
-        }
-        finally {
-          System.exit(1);
-        }
-      };
-
-      retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
-          exitRunner,
-          BASE_SLEEP_TIME_MS,
-          MAX_SLEEP_TIME_MS,
-          MAX_RETRIES
-      );
-    } else {
-      retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
-    }
+    RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES);
 
     return new ExhibitorEnsembleProvider(
         new Exhibitors(
@@ -201,14 +152,7 @@ public class CuratorModule implements Module
 
   private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
   {
-    return new Exhibitors.BackupConnectionStringProvider()
-    {
-      @Override
-      public String getBackupConnectionString()
-      {
-        return zkHosts;
-      }
-    };
+    return () -> zkHosts;
   }
 
   static class SecuredACLProvider implements ACLProvider
@@ -225,4 +169,18 @@ public class CuratorModule implements Module
       return ZooDefs.Ids.CREATOR_ALL_ACL;
     }
   }
+
+  private void shutdown(Lifecycle lifecycle)
+  {
+    //noinspection finally (not completing the 'finally' block normally is intentional)
+    try {
+      lifecycle.stop();
+    }
+    catch (Throwable t) {
+      log.error(t, "Exception when stopping druid lifecycle");
+    }
+    finally {
+      System.exit(1);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java b/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java
deleted file mode 100644
index a5fb4fe..0000000
--- a/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.curator;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.test.TestingServer;
-import org.apache.druid.java.util.common.lifecycle.Lifecycle;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Test;
-
-public final class BoundedExponentialBackoffRetryWithQuitTest
-{
-
-  private static final Logger log = new Logger(BoundedExponentialBackoffRetryWithQuitTest.class);
-
-  /*
-  Methodology (order is important!):
-    1. Zookeeper Server Service started
-    2. Lifecycle started
-    3. Curator invokes connection to service
-    4. Service is stopped
-    5. Curator attempts to do something, which invokes the retries policy
-    6. Retries exceed limit, call function which simulates an exit (since mocking System.exit() is hard to do without
-        changing a lot of dependencies)
-   */
-  @Test
-  public void testExitWithLifecycle() throws Exception
-  {
-    final Lifecycle actualNoop = new Lifecycle() {
-      @Override
-      public void start() throws Exception
-      {
-        super.start();
-        log.info("Starting lifecycle...");
-      }
-
-      @Override
-      public void stop()
-      {
-        super.stop();
-        log.info("Stopping lifecycle...");
-      }
-    };
-    Lifecycle noop = EasyMock.mock(Lifecycle.class);
-
-    noop.start();
-    EasyMock.expectLastCall().andDelegateTo(actualNoop);
-    noop.stop();
-    EasyMock.expectLastCall().andDelegateTo(actualNoop);
-    EasyMock.replay(noop);
-
-    Runnable exitFunction = () -> {
-      log.info("Zookeeper retries exhausted, exiting...");
-      noop.stop();
-      throw new RuntimeException("Simulated exit");
-    };
-
-    TestingServer server = new TestingServer();
-    BoundedExponentialBackoffRetryWithQuit retry = new BoundedExponentialBackoffRetryWithQuit(exitFunction, 1, 1, 2);
-    CuratorFramework curator = CuratorFrameworkFactory
-        .builder()
-        .connectString(server.getConnectString())
-        .sessionTimeoutMs(1000)
-        .connectionTimeoutMs(1)
-        .retryPolicy(retry)
-        .build();
-    server.start();
-    System.out.println("Server started.");
-    curator.start();
-    noop.start();
-    curator.checkExists().forPath("/tmp");
-    log.info("Connected.");
-    boolean failed = false;
-    try {
-      server.stop();
-      log.info("Stopped.");
-      curator.checkExists().forPath("/tmp");
-      Thread.sleep(10);
-      curator.checkExists().forPath("/tmp");
-    }
-    catch (Exception e) {
-      Assert.assertTrue("Correct exception type", e instanceof RuntimeException);
-      EasyMock.verify(noop);
-      curator.close();
-      failed = true;
-    }
-    Assert.assertTrue("Must be marked in failure state", failed);
-    log.info("Lifecycle stopped.");
-  }
-
-}
diff --git a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
index b4ff237..855de80 100644
--- a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
+++ b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
@@ -20,31 +20,44 @@
 package org.apache.druid.curator;
 
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.ExpectedSystemExit;
 
 import java.util.List;
 import java.util.Properties;
 
-/**
- */
 public final class CuratorModuleTest
 {
+  private static final String CURATOR_HOST_KEY = CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.HOST;
+  private static final String CURATOR_CONNECTION_TIMEOUT_MS_KEY =
+      CuratorModule.CURATOR_CONFIG_PREFIX + "." + CuratorConfig.CONNECTION_TIMEOUT_MS;
+  private static final String EXHIBITOR_HOSTS_KEY = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";
 
-  private static final String CURATOR_HOST_KEY = CuratorModule.CURATOR_CONFIG_PREFIX + ".host";
+  @Rule
+  public final ExpectedSystemExit exit = ExpectedSystemExit.none();
 
-  private static final String EXHIBITOR_HOSTS_KEY = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";
+  @Rule
+  public final LoggerCaptureRule logger = new LoggerCaptureRule(CuratorModule.class);
 
   @Test
   public void defaultEnsembleProvider()
@@ -66,7 +79,7 @@ public final class CuratorModuleTest
   public void fixedZkHosts()
   {
     Properties props = new Properties();
-    props.put(CURATOR_HOST_KEY, "hostA");
+    props.setProperty(CURATOR_HOST_KEY, "hostA");
     Injector injector = newInjector(props);
 
     injector.getInstance(CuratorFramework.class); // initialize related components
@@ -85,8 +98,8 @@ public final class CuratorModuleTest
   public void exhibitorEnsembleProvider()
   {
     Properties props = new Properties();
-    props.put(CURATOR_HOST_KEY, "hostA");
-    props.put(EXHIBITOR_HOSTS_KEY, "[\"hostB\"]");
+    props.setProperty(CURATOR_HOST_KEY, "hostA");
+    props.setProperty(EXHIBITOR_HOSTS_KEY, "[\"hostB\"]");
     Injector injector = newInjector(props);
 
     injector.getInstance(CuratorFramework.class); // initialize related components
@@ -101,8 +114,8 @@ public final class CuratorModuleTest
   public void emptyExhibitorHosts()
   {
     Properties props = new Properties();
-    props.put(CURATOR_HOST_KEY, "hostB");
-    props.put(EXHIBITOR_HOSTS_KEY, "[]");
+    props.setProperty(CURATOR_HOST_KEY, "hostB");
+    props.setProperty(EXHIBITOR_HOSTS_KEY, "[]");
     Injector injector = newInjector(props);
 
     injector.getInstance(CuratorFramework.class); // initialize related components
@@ -117,21 +130,74 @@ public final class CuratorModuleTest
     );
   }
 
+  @Test
+  public void exitsJvmWhenMaxRetriesExceeded() throws Exception
+  {
+    Properties props = new Properties();
+    props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0");
+    Injector injector = newInjector(props);
+    CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
+    curatorFramework.start();
+
+    exit.expectSystemExitWithStatus(1);
+    logger.clearLogEvents();
+
+    // This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled
+    curatorFramework.create().inBackground().forPath("/foo");
+
+    // org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice
+    List<LogEvent> loggingEvents = logger.getLogEvents();
+    Assert.assertFalse(loggingEvents.isEmpty());
+    LogEvent logEvent = loggingEvents.get(0);
+    Assert.assertEquals(Level.ERROR, logEvent.getLevel());
+    Assert.assertEquals("Unhandled error in Curator Framework", logEvent.getMessage().getFormattedMessage());
+  }
+
+  @Ignore("Verifies changes in https://github.com/apache/incubator-druid/pull/8458, but overkill for regular testing")
+  @Test
+  public void ignoresDeprecatedCuratorConfigProperties()
+  {
+    Properties props = new Properties();
+    String deprecatedPropName = CuratorModule.CURATOR_CONFIG_PREFIX + ".terminateDruidProcessOnConnectFail";
+    props.setProperty(deprecatedPropName, "true");
+    Injector injector = newInjector(props);
+
+    try {
+      injector.getInstance(CuratorFramework.class);
+    }
+    catch (Exception e) {
+      Assert.fail("Deprecated curator config was not ignored:\n" + e);
+    }
+  }
+
   private Injector newInjector(final Properties props)
   {
     List<Module> modules = ImmutableList.<Module>builder()
         .addAll(GuiceInjectors.makeDefaultStartupModules())
-        .add(new LifecycleModule()).add(new CuratorModule()).build();
+        .add(new LifecycleModule())
+        .add(new CuratorModule())
+        .build();
     return Guice.createInjector(
-        Modules.override(modules).with(new Module()
-        {
-          @Override
-          public void configure(Binder binder)
-          {
-            binder.bind(Properties.class).toInstance(props);
-          }
-        })
+        Modules.override(modules).with(binder -> binder.bind(Properties.class).toInstance(props))
     );
   }
 
+  private static CuratorFramework createCuratorFramework(Injector injector, int maxRetries)
+  {
+    CuratorFramework curatorFramework = injector.getInstance(CuratorFramework.class);
+    RetryPolicy retryPolicy = curatorFramework.getZookeeperClient().getRetryPolicy();
+    Assert.assertThat(retryPolicy, CoreMatchers.instanceOf(ExponentialBackoffRetry.class));
+    RetryPolicy adjustedRetryPolicy = adjustRetryPolicy((BoundedExponentialBackoffRetry) retryPolicy, 0);
+    curatorFramework.getZookeeperClient().setRetryPolicy(adjustedRetryPolicy);
+    return curatorFramework;
+  }
+
+  private static RetryPolicy adjustRetryPolicy(BoundedExponentialBackoffRetry origRetryPolicy, int maxRetries)
+  {
+    return new BoundedExponentialBackoffRetry(
+        origRetryPolicy.getBaseSleepTimeMs(),
+        origRetryPolicy.getMaxSleepTimeMs(),
+        maxRetries
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org