You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/23 21:16:57 UTC

[GitHub] [druid] kroeders opened a new pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

kroeders opened a new pull request #10427:
URL: https://github.com/apache/druid/pull/10427


   ### Description
   
   This PR is an extension to provide a solution for https://github.com/apache/druid/issues/10294 where an issue is discussed that queries with lookups may sometimes fail unnecessarily due to the lookup only being available on some historicals. There is a small related change that would be required in core Druid, which is describe in a separate PR here 
   
   Servers are selected to query for particular segments in CachingClusteredClient using a ServerSelectorStrategy. This extension proposes a new FilteringServerSelectorStrategy that filters the servers provided by the TierSelectorStrategy and then applies another ServerSelectorStrategy (such as the existing random or connection count strategies). A new component, LookupStatusView is added to poll the nodeStatus endpoint on the coordinator for lookup status. 
   
   This PR has:
   - [X] been self-reviewed.
   - [X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] been tested in a test Druid cluster.
   
   ##### Key changed/added classes in this PR
    * `FilteringServerSelectorStrategy`
    * `LookupFilterServerSelectorModule`
    * `DelegateServerSelectorStrategy`
    * `LookupAwareFilterStrategy`
    * `LookupStatusView`
    * `ServerFilterStrategy`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493905533



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupAwareFilterStrategy.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.client.selector.QueryableDruidServer;
+import org.apache.druid.client.selector.filter.ServerFilterStrategy;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.extraction.CascadeExtractionFn;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
+import org.apache.druid.query.search.SearchQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@JsonTypeName("lookupsLoaded")
+public class LookupAwareFilterStrategy implements ServerFilterStrategy
+{
+  public static final String LOOKUPS_CONTEXT_KEY = "lookups";
+  public static final String QUERY_LOOKUPS_CACHE_KEY = "__lookups";
+  private final LookupStatusView lookupStatusView;
+
+  @JsonCreator
+  public LookupAwareFilterStrategy(@JacksonInject LookupStatusView lookupStatusView)
+  {
+    this.lookupStatusView = lookupStatusView;
+  }
+
+  @Override
+  public <T> Set<QueryableDruidServer> filter(Query<T> query, Set<QueryableDruidServer> servers)
+  {
+    if (query == null) {
+      return servers;
+    }
+    final Set<String> lookups;
+    if (query.getContext() != null
+        && query.getContext().containsKey(QUERY_LOOKUPS_CACHE_KEY)) {
+      lookups = (Set<String>) query.getContext().get(QUERY_LOOKUPS_CACHE_KEY);
+    } else {
+      lookups = new HashSet<String>();
+      findLookupsInQuery(query, lookups);
+      if (query.getContext() != null) {
+        try {
+          query.getContext().put(QUERY_LOOKUPS_CACHE_KEY, lookups);
+        }
+        catch (UnsupportedOperationException e) {
+          // TODO This is for immutable contexts, what to do with it?
+        }
+      }
+    }
+    if (lookups.size() == 0) {
+      return servers;
+    }
+
+    Set<QueryableDruidServer> filteredServers = servers.stream()
+        .filter(server -> lookupStatusView.hasAllLookups(server.getServer().getHostAndPort().toString(), lookups))
+        .collect(Collectors.toSet());
+
+    if (filteredServers.size() == 0) {
+      throw new ISE("Unable to find servers with all lookups [%s]", lookups.stream().collect(Collectors.joining(",")));
+    }
+
+    return filteredServers;
+  }
+
+  private <T> Set<String> findLookupsInQuery(Query<T> query, Set<String> lookupsInQuery)
+  {
+    if (query.getContext() != null && query.getContext().containsKey(LOOKUPS_CONTEXT_KEY)) {
+      Object userLookups = query.getContext().get(LOOKUPS_CONTEXT_KEY);
+      if (userLookups instanceof Collection) {
+        lookupsInQuery.addAll(((Collection) userLookups));
+      }
+    }
+
+    findLookupsInDataSources(query.getDataSource(), lookupsInQuery);
+    findLookupsInDimensions(query, lookupsInQuery);
+    findLookupsInVirtualColumns(query, lookupsInQuery);
+
+    return lookupsInQuery;
+  }
+
+  public <T> void findLookupsInVirtualColumns(Query<T> query, Set<String> lookupsInQuery)
+  {
+    VirtualColumns virts = query.getVirtualColumns();
+    for (VirtualColumn vc : virts.getVirtualColumns()) {
+      if (vc instanceof ExpressionVirtualColumn) {
+        ExpressionVirtualColumn evc = (ExpressionVirtualColumn) vc;
+        Expr expr = evc.getParsedExpression().get();
+        Matcher m = Pattern.compile("lookup\\(.*?, \\'(.*?)\\'\\)").matcher(expr.stringify());

Review comment:
       I don't like using a regex here. Ideally, LookupExprMacro.LookupExpr would be promoted from a local class to something accessible - but I didn't want to include any other changes to core Druid on this PR, if the other commit gets into core then the rest of this functionality can be achieved just with this extension. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493904523



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupAwareFilterStrategy.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.client.selector.QueryableDruidServer;
+import org.apache.druid.client.selector.filter.ServerFilterStrategy;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.extraction.CascadeExtractionFn;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
+import org.apache.druid.query.search.SearchQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@JsonTypeName("lookupsLoaded")
+public class LookupAwareFilterStrategy implements ServerFilterStrategy
+{
+  public static final String LOOKUPS_CONTEXT_KEY = "lookups";
+  public static final String QUERY_LOOKUPS_CACHE_KEY = "__lookups";
+  private final LookupStatusView lookupStatusView;
+
+  @JsonCreator
+  public LookupAwareFilterStrategy(@JacksonInject LookupStatusView lookupStatusView)
+  {
+    this.lookupStatusView = lookupStatusView;
+  }
+
+  @Override
+  public <T> Set<QueryableDruidServer> filter(Query<T> query, Set<QueryableDruidServer> servers)
+  {
+    if (query == null) {
+      return servers;
+    }
+    final Set<String> lookups;
+    if (query.getContext() != null
+        && query.getContext().containsKey(QUERY_LOOKUPS_CACHE_KEY)) {
+      lookups = (Set<String>) query.getContext().get(QUERY_LOOKUPS_CACHE_KEY);
+    } else {
+      lookups = new HashSet<String>();
+      findLookupsInQuery(query, lookups);
+      if (query.getContext() != null) {
+        try {
+          query.getContext().put(QUERY_LOOKUPS_CACHE_KEY, lookups);
+        }
+        catch (UnsupportedOperationException e) {
+          // TODO This is for immutable contexts, what to do with it?
+        }
+      }
+    }
+    if (lookups.size() == 0) {
+      return servers;
+    }
+
+    Set<QueryableDruidServer> filteredServers = servers.stream()
+        .filter(server -> lookupStatusView.hasAllLookups(server.getServer().getHostAndPort().toString(), lookups))
+        .collect(Collectors.toSet());
+
+    if (filteredServers.size() == 0) {
+      throw new ISE("Unable to find servers with all lookups [%s]", lookups.stream().collect(Collectors.joining(",")));
+    }
+
+    return filteredServers;
+  }
+
+  private <T> Set<String> findLookupsInQuery(Query<T> query, Set<String> lookupsInQuery)
+  {
+    if (query.getContext() != null && query.getContext().containsKey(LOOKUPS_CONTEXT_KEY)) {
+      Object userLookups = query.getContext().get(LOOKUPS_CONTEXT_KEY);
+      if (userLookups instanceof Collection) {
+        lookupsInQuery.addAll(((Collection) userLookups));
+      }
+    }
+
+    findLookupsInDataSources(query.getDataSource(), lookupsInQuery);
+    findLookupsInDimensions(query, lookupsInQuery);
+    findLookupsInVirtualColumns(query, lookupsInQuery);
+
+    return lookupsInQuery;
+  }
+
+  public <T> void findLookupsInVirtualColumns(Query<T> query, Set<String> lookupsInQuery)
+  {
+    VirtualColumns virts = query.getVirtualColumns();
+    for (VirtualColumn vc : virts.getVirtualColumns()) {
+      if (vc instanceof ExpressionVirtualColumn) {
+        ExpressionVirtualColumn evc = (ExpressionVirtualColumn) vc;
+        Expr expr = evc.getParsedExpression().get();
+        Matcher m = Pattern.compile("lookup\\(.*?, \\'(.*?)\\'\\)").matcher(expr.stringify());
+        if (m.find()) {
+          lookupsInQuery.add(m.group(1));
+        }
+      }
+    }
+  }
+
+  public void findLookupsInDataSources(DataSource dataSource, Set<String> lookupsInQuery)
+  {
+    if (dataSource instanceof LookupDataSource) {

Review comment:
       This needs some work still, lookups that only need to be used on the broker aren't going to be necessary here. I don't think a lookup only query will ever get to this point, though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493905669



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupAwareFilterStrategy.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.client.selector.QueryableDruidServer;
+import org.apache.druid.client.selector.filter.ServerFilterStrategy;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.extraction.CascadeExtractionFn;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
+import org.apache.druid.query.search.SearchQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@JsonTypeName("lookupsLoaded")
+public class LookupAwareFilterStrategy implements ServerFilterStrategy
+{
+  public static final String LOOKUPS_CONTEXT_KEY = "lookups";
+  public static final String QUERY_LOOKUPS_CACHE_KEY = "__lookups";
+  private final LookupStatusView lookupStatusView;
+
+  @JsonCreator
+  public LookupAwareFilterStrategy(@JacksonInject LookupStatusView lookupStatusView)
+  {
+    this.lookupStatusView = lookupStatusView;
+  }
+
+  @Override
+  public <T> Set<QueryableDruidServer> filter(Query<T> query, Set<QueryableDruidServer> servers)
+  {
+    if (query == null) {
+      return servers;
+    }
+    final Set<String> lookups;
+    if (query.getContext() != null
+        && query.getContext().containsKey(QUERY_LOOKUPS_CACHE_KEY)) {
+      lookups = (Set<String>) query.getContext().get(QUERY_LOOKUPS_CACHE_KEY);
+    } else {
+      lookups = new HashSet<String>();
+      findLookupsInQuery(query, lookups);
+      if (query.getContext() != null) {
+        try {
+          query.getContext().put(QUERY_LOOKUPS_CACHE_KEY, lookups);
+        }
+        catch (UnsupportedOperationException e) {
+          // TODO This is for immutable contexts, what to do with it?
+        }
+      }
+    }
+    if (lookups.size() == 0) {
+      return servers;
+    }
+
+    Set<QueryableDruidServer> filteredServers = servers.stream()
+        .filter(server -> lookupStatusView.hasAllLookups(server.getServer().getHostAndPort().toString(), lookups))
+        .collect(Collectors.toSet());
+
+    if (filteredServers.size() == 0) {
+      throw new ISE("Unable to find servers with all lookups [%s]", lookups.stream().collect(Collectors.joining(",")));
+    }
+
+    return filteredServers;
+  }
+
+  private <T> Set<String> findLookupsInQuery(Query<T> query, Set<String> lookupsInQuery)
+  {
+    if (query.getContext() != null && query.getContext().containsKey(LOOKUPS_CONTEXT_KEY)) {
+      Object userLookups = query.getContext().get(LOOKUPS_CONTEXT_KEY);
+      if (userLookups instanceof Collection) {
+        lookupsInQuery.addAll(((Collection) userLookups));
+      }
+    }
+
+    findLookupsInDataSources(query.getDataSource(), lookupsInQuery);
+    findLookupsInDimensions(query, lookupsInQuery);
+    findLookupsInVirtualColumns(query, lookupsInQuery);
+
+    return lookupsInQuery;
+  }
+
+  public <T> void findLookupsInVirtualColumns(Query<T> query, Set<String> lookupsInQuery)
+  {
+    VirtualColumns virts = query.getVirtualColumns();
+    for (VirtualColumn vc : virts.getVirtualColumns()) {
+      if (vc instanceof ExpressionVirtualColumn) {
+        ExpressionVirtualColumn evc = (ExpressionVirtualColumn) vc;
+        Expr expr = evc.getParsedExpression().get();
+        Matcher m = Pattern.compile("lookup\\(.*?, \\'(.*?)\\'\\)").matcher(expr.stringify());
+        if (m.find()) {
+          lookupsInQuery.add(m.group(1));
+        }
+      }
+    }
+  }
+
+  public void findLookupsInDataSources(DataSource dataSource, Set<String> lookupsInQuery)
+  {
+    if (dataSource instanceof LookupDataSource) {
+      lookupsInQuery.add(((LookupDataSource) dataSource).getLookupName());
+    }
+    if (dataSource instanceof JoinDataSource) {
+      JoinDataSource jds = (JoinDataSource) dataSource;
+      findLookupsInDataSources(jds.getLeft(), lookupsInQuery);
+      findLookupsInDataSources(jds.getRight(), lookupsInQuery);
+    }
+  }
+
+  public <T> void findLookupsInDimensions(Query<T> query, Set<String> lookupsInQuery)
+  {
+    if (query instanceof TopNQuery) {
+      TopNQuery tnq = (TopNQuery) (query);
+      addLookupsFromDimension(tnq.getDimensionSpec(), lookupsInQuery);
+    } else if (query instanceof GroupByQuery) {
+      GroupByQuery gbq = (GroupByQuery) (query);
+      for (DimensionSpec d : gbq.getDimensions()) {
+        addLookupsFromDimension(d, lookupsInQuery);
+      }
+    } else if (query instanceof SearchQuery) {
+      SearchQuery gbq = (SearchQuery) (query);
+      for (DimensionSpec d : gbq.getDimensions()) {

Review comment:
       Dimensions are defined in each subclass, there's no interface atm. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493906766



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupStatusView.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.google.common.net.HostAndPort;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.guice.ManageLifecycleAnnouncements;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.query.lookup.LookupsState;
+import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycleAnnouncements

Review comment:
       This was added because there was some issue trying to inject the Coordinator client, this seemed to help and I never saw that issue when the code was included in the broker core code. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on pull request #10427:
URL: https://github.com/apache/druid/pull/10427#issuecomment-713072808


   after running the extension on a cluster where the datasource was around 13,000 segments. The random server selector averaged 8.33 ms across several hundred queries while the lookup aware selector average around 13.87 ms. The overall query execution time was on the order of several seconds, so the initial results show there is negligible performance impact from adding this selector.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493906167



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupStatusView.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.google.common.net.HostAndPort;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.guice.ManageLifecycleAnnouncements;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.query.lookup.LookupsState;
+import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ManageLifecycleAnnouncements
+public class LookupStatusView
+{
+  private final LookupsCoordinatorClient client;
+  private final ScheduledExecutorService scheduledExec;
+  private final LookupStatusViewConfig config;
+  private volatile Map<String, Set<String>> hostToLookups = new HashMap<String, Set<String>>();
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public LookupStatusView(LookupsCoordinatorClient client, LookupStatusViewConfig config)
+  {
+    this.client = client;
+    this.config = config;
+    this.scheduledExec = Execs.scheduledSingleThreaded("BrokerServerView-Lookups--%d");
+  }
+
+  public boolean hasAllLookups(String server, Set<String> lookups)
+  {
+    lifecycleLock.awaitStarted();
+    Set<String> serverLookups = hostToLookups.get(server);
+    return serverLookups != null && serverLookups.containsAll(lookups);
+  }
+
+  @LifecycleStart
+  public void start() throws InterruptedException
+  {
+    if (!lifecycleLock.canStart()) {
+      throw new ISE("can't start.");
+    }
+    
+    try {
+      CountDownLatch setupLatch = new CountDownLatch(1);
+      scheduledExec.scheduleAtFixedRate(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> lookups = client
+              .fetchLookupNodeStatus();
+          Map<String, Set<String>> loadedHostToLookups = new HashMap<String, Set<String>>();

Review comment:
       I think this should be updated to make sure that all servers have the latest of a lookup, not just a lookup with the right name. I'm working on this right now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on a change in pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on a change in pull request #10427:
URL: https://github.com/apache/druid/pull/10427#discussion_r493903568



##########
File path: extensions-contrib/lookup-aware-server-selector/src/main/java/org/apache/druid/client/selector/filter/lookup/LookupAwareFilterStrategy.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector.filter.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.client.selector.QueryableDruidServer;
+import org.apache.druid.client.selector.filter.ServerFilterStrategy;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.extraction.CascadeExtractionFn;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
+import org.apache.druid.query.search.SearchQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@JsonTypeName("lookupsLoaded")
+public class LookupAwareFilterStrategy implements ServerFilterStrategy
+{
+  public static final String LOOKUPS_CONTEXT_KEY = "lookups";
+  public static final String QUERY_LOOKUPS_CACHE_KEY = "__lookups";
+  private final LookupStatusView lookupStatusView;
+
+  @JsonCreator
+  public LookupAwareFilterStrategy(@JacksonInject LookupStatusView lookupStatusView)
+  {
+    this.lookupStatusView = lookupStatusView;
+  }
+
+  @Override
+  public <T> Set<QueryableDruidServer> filter(Query<T> query, Set<QueryableDruidServer> servers)
+  {
+    if (query == null) {
+      return servers;
+    }
+    final Set<String> lookups;
+    if (query.getContext() != null

Review comment:
       I'm not sure about this use of context, but I would prefer to not calculate the associated lookups in each selection. The withOverridenContext methods return a new Query, so maybe some other way to cache it? I was also wondering if it would be better to calculate a blocked list of servers once and then make the FilteringServerSelectorStrategy operate  on that list, which would allow stacking filters to handle other resiliency issues. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders closed pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders closed pull request #10427:
URL: https://github.com/apache/druid/pull/10427


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kroeders commented on pull request #10427: ServerSelectorStrategy to filter servers with missing required lookups

Posted by GitBox <gi...@apache.org>.
kroeders commented on pull request #10427:
URL: https://github.com/apache/druid/pull/10427#issuecomment-729754115


   reopening to restart travis-ci


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org