You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/04/18 07:44:12 UTC
[druid] branch master updated: Fail fast incase a lookup load fails (#12397)
This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new de9f12b5c6 Fail fast incase a lookup load fails (#12397)
de9f12b5c6 is described below
commit de9f12b5c6d6b1a6096c0eec04097101e4792878
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Apr 18 13:14:02 2022 +0530
Fail fast incase a lookup load fails (#12397)
Currently while loading a lookup for the first time, loading threads blocks
for `waitForFirstRunMs` incase the lookup failed to load. If the `waitForFirstRunMs`
is long (like 10 minutes), such blocking can slow down the loading of other lookups.
This commit allows the thread to progress as soon as the loading of the lookup fails.
---
.../lookup/namespace/cache/CacheScheduler.java | 27 +++++++--
.../lookup/namespace/cache/CacheSchedulerTest.java | 66 +++++++++++++++++++++-
2 files changed, 88 insertions(+), 5 deletions(-)
diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
index 43d85d1389..cd21738f66 100644
--- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
+++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java
@@ -37,10 +37,12 @@ import javax.annotation.Nullable;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -151,6 +153,7 @@ public final class CacheScheduler
private final CacheGenerator<T> cacheGenerator;
private final ConcurrentAwaitableCounter updateCounter = new ConcurrentAwaitableCounter();
private final CountDownLatch startLatch = new CountDownLatch(1);
+ private final CompletableFuture<Boolean> firstLoadFinishedSuccessfully = new CompletableFuture<>();
private EntryImpl(final T namespace, final Entry<T> entry, final CacheGenerator<T> cacheGenerator)
{
@@ -185,13 +188,14 @@ public final class CacheScheduler
private void updateCache()
{
+ boolean updatedCacheSuccessfully = false;
try {
// Ensures visibility of the whole EntryImpl's state (fields and their state).
startLatch.await();
CacheState currentCacheState = cacheStateHolder.get();
if (!Thread.currentThread().isInterrupted() && currentCacheState != NoCache.ENTRY_CLOSED) {
final String currentVersion = currentVersionOrNull(currentCacheState);
- tryUpdateCache(currentVersion);
+ updatedCacheSuccessfully = tryUpdateCache(currentVersion);
}
}
catch (Throwable t) {
@@ -205,9 +209,14 @@ public final class CacheScheduler
throw new RuntimeException(t);
}
}
+ finally {
+ if (!firstLoadFinishedSuccessfully.isDone()) {
+ firstLoadFinishedSuccessfully.complete(updatedCacheSuccessfully);
+ }
+ }
}
- private void tryUpdateCache(String currentVersion) throws Exception
+ private boolean tryUpdateCache(String currentVersion) throws Exception
{
boolean updatedCacheSuccessfully = false;
CacheHandler newCache = null;
@@ -253,6 +262,7 @@ public final class CacheScheduler
throw t;
}
}
+ return updatedCacheSuccessfully;
}
private String currentVersionOrNull(CacheState currentCacheState)
@@ -467,22 +477,31 @@ public final class CacheScheduler
@Nullable
public Entry scheduleAndWait(ExtractionNamespace namespace, long waitForFirstRunMs) throws InterruptedException
{
+ Exception loadException = null;
final Entry entry = schedule(namespace);
log.debug("Scheduled new %s", entry);
boolean success = false;
try {
- success = entry.impl.updateCounter.awaitFirstIncrement(waitForFirstRunMs, TimeUnit.MILLISECONDS);
+ success = (boolean) entry.impl.firstLoadFinishedSuccessfully.get(waitForFirstRunMs, TimeUnit.MILLISECONDS);
if (success) {
return entry;
} else {
return null;
}
}
+ catch (ExecutionException | TimeoutException e) {
+ loadException = e;
+ return null;
+ }
finally {
if (!success) {
// ExecutionException's cause is logged in entry.close()
entry.close();
- log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+ if (loadException != null) {
+ log.error(loadException, "CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+ } else {
+ log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
+ }
}
}
}
diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
index 51412d71e6..44289b0483 100644
--- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
+++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java
@@ -22,15 +22,20 @@ package org.apache.druid.server.lookup.namespace.cache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
+import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
+import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
+import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
@@ -54,6 +59,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -144,7 +150,9 @@ public class CacheSchedulerTest
new NoopServiceEmitter(),
ImmutableMap.of(
UriExtractionNamespace.class,
- cacheGenerator
+ cacheGenerator,
+ JdbcExtractionNamespace.class,
+ new JdbcCacheGenerator()
),
cacheManager
);
@@ -412,6 +420,62 @@ public class CacheSchedulerTest
Assert.assertEquals(0, scheduler.getActiveEntries());
}
+ @Test(timeout = 60_000L)
+ public void testSimpleSubmissionSuccessWithWait() throws InterruptedException
+ {
+ UriExtractionNamespace namespace = new UriExtractionNamespace(
+ tmpFile.toURI(),
+ null, null,
+ new UriExtractionNamespace.ObjectMapperFlatDataParser(
+ UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
+ ),
+ new Period(0),
+ null,
+ null
+ );
+ CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 10_000L);
+ waitFor(entry);
+ Assert.assertEquals(VALUE, entry.getCache().get(KEY));
+ }
+
+
+ @Test(timeout = 20_000L)
+ public void testSimpleSubmissionFailureWithWait() throws InterruptedException
+ {
+ JdbcExtractionNamespace namespace = new JdbcExtractionNamespace(
+ new MetadataStorageConnectorConfig()
+ {
+ @Override
+ public String getConnectURI()
+ {
+ return "jdbc:mysql://dummy:3306/db";
+ }
+ },
+ "foo",
+ "k",
+ "val",
+ "time",
+ "some filter",
+ new Period(10_000),
+ null,
+ new JdbcAccessSecurityConfig()
+ {
+ @Override
+ public Set<String> getAllowedProperties()
+ {
+ return ImmutableSet.of("valid_key1", "valid_key2");
+ }
+
+ @Override
+ public boolean isEnforceAllowedProperties()
+ {
+ return true;
+ }
+ }
+ );
+ scheduler.scheduleAndWait(namespace, 40_000L);
+ }
+
private void scheduleDanglingEntry() throws InterruptedException
{
CacheScheduler.Entry entry = scheduler.schedule(getUriExtractionNamespace(5));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org