You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/04/18 07:44:12 UTC

[druid] branch master updated: Fail fast incase a lookup load fails (#12397)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new de9f12b5c6 Fail fast incase a lookup load fails (#12397)
de9f12b5c6 is described below

commit de9f12b5c6d6b1a6096c0eec04097101e4792878
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Apr 18 13:14:02 2022 +0530

    Fail fast incase a lookup load fails (#12397)
    
    Currently while loading a lookup for the first time, loading threads blocks
    for `waitForFirstRunMs` incase the lookup failed to load. If the `waitForFirstRunMs`
    is long (like 10 minutes), such blocking can slow down the loading of other lookups.
    
    This commit allows the thread to progress as soon as the loading of the lookup fails.
---
 .../lookup/namespace/cache/CacheScheduler.java     | 27 +++++++--
 .../lookup/namespace/cache/CacheSchedulerTest.java | 66 +++++++++++++++++++++-
 2 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
index 43d85d1389..cd21738f66 100644
--- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
+++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
@@ -37,10 +37,12 @@ import javax.annotation.Nullable;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -151,6 +153,7 @@ public final class CacheScheduler
     private final CacheGenerator<T> cacheGenerator;
     private final ConcurrentAwaitableCounter updateCounter = new ConcurrentAwaitableCounter();
     private final CountDownLatch startLatch = new CountDownLatch(1);
+    private final CompletableFuture<Boolean> firstLoadFinishedSuccessfully = new CompletableFuture<>();
 
     private EntryImpl(final T namespace, final Entry<T> entry, final CacheGenerator<T> cacheGenerator)
     {
@@ -185,13 +188,14 @@ public final class CacheScheduler
 
     private void updateCache()
     {
+      boolean updatedCacheSuccessfully = false;
       try {
         // Ensures visibility of the whole EntryImpl's state (fields and their state).
         startLatch.await();
         CacheState currentCacheState = cacheStateHolder.get();
         if (!Thread.currentThread().isInterrupted() && currentCacheState != NoCache.ENTRY_CLOSED) {
           final String currentVersion = currentVersionOrNull(currentCacheState);
-          tryUpdateCache(currentVersion);
+          updatedCacheSuccessfully = tryUpdateCache(currentVersion);
         }
       }
       catch (Throwable t) {
@@ -205,9 +209,14 @@ public final class CacheScheduler
           throw new RuntimeException(t);
         }
       }
+      finally {
+        if (!firstLoadFinishedSuccessfully.isDone()) {
+          firstLoadFinishedSuccessfully.complete(updatedCacheSuccessfully);
+        }
+      }
     }
 
-    private void tryUpdateCache(String currentVersion) throws Exception
+    private boolean tryUpdateCache(String currentVersion) throws Exception
     {
       boolean updatedCacheSuccessfully = false;
       CacheHandler newCache = null;
@@ -253,6 +262,7 @@ public final class CacheScheduler
           throw t;
         }
       }
+      return updatedCacheSuccessfully;
     }
 
     private String currentVersionOrNull(CacheState currentCacheState)
@@ -467,22 +477,31 @@ public final class CacheScheduler
   @Nullable
   public Entry scheduleAndWait(ExtractionNamespace namespace, long waitForFirstRunMs) throws InterruptedException
   {
+    Exception loadException = null;
     final Entry entry = schedule(namespace);
     log.debug("Scheduled new %s", entry);
     boolean success = false;
     try {
-      success = entry.impl.updateCounter.awaitFirstIncrement(waitForFirstRunMs, TimeUnit.MILLISECONDS);
+      success = (boolean) entry.impl.firstLoadFinishedSuccessfully.get(waitForFirstRunMs, TimeUnit.MILLISECONDS);
       if (success) {
         return entry;
       } else {
         return null;
       }
     }
+    catch (ExecutionException | TimeoutException e) {
+      loadException = e;
+      return null;
+    }
     finally {
       if (!success) {
         // ExecutionException's cause is logged in entry.close()
         entry.close();
-        log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+        if (loadException != null) {
+          log.error(loadException, "CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+        } else {
+          log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+        }
       }
     }
   }
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index 51412d71e6..44289b0483 100644
--- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -22,15 +22,20 @@ package org.apache.druid.server.lookup.namespace.cache;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.query.lookup.namespace.CacheGenerator;
+import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
 import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
 import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.joda.time.Period;
@@ -54,6 +59,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -144,7 +150,9 @@ public class CacheSchedulerTest
         new NoopServiceEmitter(),
         ImmutableMap.of(
             UriExtractionNamespace.class,
-            cacheGenerator
+            cacheGenerator,
+            JdbcExtractionNamespace.class,
+            new JdbcCacheGenerator()
         ),
         cacheManager
     );
@@ -412,6 +420,62 @@ public class CacheSchedulerTest
     Assert.assertEquals(0, scheduler.getActiveEntries());
   }
 
+  @Test(timeout = 60_000L)
+  public void testSimpleSubmissionSuccessWithWait() throws InterruptedException
+  {
+    UriExtractionNamespace namespace = new UriExtractionNamespace(
+        tmpFile.toURI(),
+        null, null,
+        new UriExtractionNamespace.ObjectMapperFlatDataParser(
+            UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
+        ),
+        new Period(0),
+        null,
+        null
+    );
+    CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 10_000L);
+    waitFor(entry);
+    Assert.assertEquals(VALUE, entry.getCache().get(KEY));
+  }
+
+
+  @Test(timeout = 20_000L)
+  public void testSimpleSubmissionFailureWithWait() throws InterruptedException
+  {
+    JdbcExtractionNamespace namespace = new JdbcExtractionNamespace(
+        new MetadataStorageConnectorConfig()
+        {
+          @Override
+          public String getConnectURI()
+          {
+            return "jdbc:mysql://dummy:3306/db";
+          }
+        },
+        "foo",
+        "k",
+        "val",
+        "time",
+        "some filter",
+        new Period(10_000),
+        null,
+        new JdbcAccessSecurityConfig()
+        {
+          @Override
+          public Set<String> getAllowedProperties()
+          {
+            return ImmutableSet.of("valid_key1", "valid_key2");
+          }
+
+          @Override
+          public boolean isEnforceAllowedProperties()
+          {
+            return true;
+          }
+        }
+    );
+    scheduler.scheduleAndWait(namespace, 40_000L);
+  }
+
   private void scheduleDanglingEntry() throws InterruptedException
   {
     CacheScheduler.Entry entry = scheduler.schedule(getUriExtractionNamespace(5));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org